You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:43:56 UTC
[08/51] [partial] incubator-asterixdb git commit: Change folder
structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
new file mode 100644
index 0000000..b45368a
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.FeedJobState;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedLoadManager implements IFeedLoadManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
+
+ private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
+ private final TreeSet<NodeLoadReport> nodeReports;
+ private final Map<FeedConnectionId, FeedActivity> feedActivities;
+ private final Map<String, Pair<Integer, Integer>> feedMetrics;
+
+ private FeedConnectionId lastModified;
+ private long lastModifiedTimestamp;
+
+ private static final int UNKNOWN = -1;
+
+ public FeedLoadManager() {
+ this.nodeReports = new TreeSet<NodeLoadReport>();
+ this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
+ this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
+ }
+
+ @Override
+ public void submitNodeLoadReport(NodeLoadReport report) {
+ nodeReports.remove(report);
+ nodeReports.add(report);
+ }
+
+ @Override
+ public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
+ FeedRuntimeId runtimeId = message.getRuntimeId();
+ FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+ if (jobState == null
+ || (jobState.equals(FeedJobState.UNDER_RECOVERY))
+ || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
+ - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring congestion report from " + runtimeId);
+ }
+ return;
+ } else {
+ try {
+ FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+ int inflowRate = message.getInflowRate();
+ int outflowRate = message.getOutflowRate();
+ List<String> currentComputeLocations = new ArrayList<String>();
+ currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
+ .getConnectionId().getFeedId()));
+ int computeCardinality = currentComputeLocations.size();
+ int requiredCardinality = (int) Math
+ .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
+ int additionalComputeNodes = requiredCardinality - computeCardinality;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
+ + additionalComputeNodes);
+ }
+
+ List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
+
+ // Step 1) Alter the original feed job to adjust the cardinality
+ JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+ .getConnectionId());
+ helperComputeNodes.addAll(currentComputeLocations);
+ List<String> newLocations = new ArrayList<String>();
+ newLocations.addAll(currentComputeLocations);
+ newLocations.addAll(helperComputeNodes);
+ FeedUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
+
+ // Step 2) send prepare to stall message
+ gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
+
+ // Step 3) run the altered job specification
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("New Job after adjusting to the workload " + jobSpec);
+ }
+
+ Thread.sleep(10000);
+ runJob(jobSpec, false);
+ lastModified = message.getConnectionId();
+ lastModifiedTimestamp = System.currentTimeMillis();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
+ }
+ throw new AsterixException(e);
+ }
+ }
+ }
+
+ @Override
+ public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
+ FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+ if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
+ }
+ return;
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing scale-in message " + message);
+ }
+ FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+ JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+ .getConnectionId());
+ int reducedCardinality = message.getReducedCardinaliy();
+ List<String> currentComputeLocations = new ArrayList<String>();
+ currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
+ .getFeedId()));
+ FeedUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
+ currentComputeLocations);
+
+ gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
+ Thread.sleep(3000);
+ JobId newJobId = runJob(jobSpec, false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
+ }
+
+ }
+ }
+
+ private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
+ throws Exception {
+ // Step 1) send prepare to stall message
+ PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
+ List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+ List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+
+ Set<String> operatorLocations = new HashSet<String>();
+
+ operatorLocations.addAll(intakeLocations);
+ operatorLocations.addAll(computeLocations);
+ operatorLocations.addAll(storageLocations);
+
+ JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
+ runJob(messageJobSpec, true);
+
+ // Step 2)
+ TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
+ messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
+ runJob(messageJobSpec, true);
+ }
+
+ public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobId jobId = hcc.startJob(spec);
+ if (waitForCompletion) {
+ hcc.waitForCompletion(jobId);
+ }
+ return jobId;
+ }
+
+ @Override
+ public void submitFeedRuntimeReport(FeedReportMessage report) {
+ String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
+ Pair<Integer, Integer> value = feedMetrics.get(key);
+ if (value == null) {
+ value = new Pair<Integer, Integer>(report.getValue(), 1);
+ feedMetrics.put(key, value);
+ } else {
+ value.first = value.first + report.getValue();
+ value.second = value.second + 1;
+ }
+ }
+
+ @Override
+ public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
+ int rVal;
+ String key = "" + connectionId + ":" + runtimeType;
+ feedMetrics.get(key);
+ Pair<Integer, Integer> value = feedMetrics.get(key);
+ if (value == null) {
+ rVal = UNKNOWN;
+ } else {
+ rVal = value.first / value.second;
+ }
+ return rVal;
+ }
+
+ private List<String> getNodeForSubstitution(int nRequired) {
+ List<String> nodeIds = new ArrayList<String>();
+ Iterator<NodeLoadReport> it = null;
+ int nAdded = 0;
+ while (nAdded < nRequired) {
+ it = nodeReports.iterator();
+ while (it.hasNext()) {
+ nodeIds.add(it.next().getNodeId());
+ nAdded++;
+ }
+ }
+ return nodeIds;
+ }
+
+ @Override
+ public synchronized List<String> getNodes(int required) {
+ Iterator<NodeLoadReport> it;
+ List<String> allocated = new ArrayList<String>();
+ while (allocated.size() < required) {
+ it = nodeReports.iterator();
+ while (it.hasNext() && allocated.size() < required) {
+ allocated.add(it.next().getNodeId());
+ }
+ }
+ return allocated;
+ }
+
+ @Override
+ public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
+ System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
+ FeedConnectionId connectionId = mesg.getConnectionId();
+ List<String> destinationLocations = new ArrayList<String>();
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+ List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+
+ destinationLocations.addAll(storageLocations);
+ destinationLocations.addAll(collectLocations);
+ JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
+ destinationLocations);
+ runJob(messageJobSpec, true);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
+ }
+ IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
+ trackingManager.disableAcking(connectionId);
+ }
+
+ @Override
+ public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
+ feedActivities.put(connectionId, activity);
+ }
+
+ @Override
+ public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
+ return feedActivities.get(connectionId);
+ }
+
+ @Override
+ public Collection<FeedActivity> getFeedActivities() {
+ return feedActivities.values();
+ }
+
+ @Override
+ public void removeFeedActivity(FeedConnectionId connectionId) {
+ feedActivities.remove(connectionId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
new file mode 100644
index 0000000..e3020aa
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.FeedMessageService;
+import edu.uci.ics.asterix.common.feeds.FeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReportService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedSubscriptionManager;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedManager implements IFeedManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
+
+ private final IFeedSubscriptionManager feedSubscriptionManager;
+
+ private final IFeedConnectionManager feedConnectionManager;
+
+ private final IFeedMemoryManager feedMemoryManager;
+
+ private final IFeedMetricCollector feedMetricCollector;
+
+ private final IFeedMetadataManager feedMetadataManager;
+
+ private final IFeedMessageService feedMessageService;
+
+ private final NodeLoadReportService nodeLoadReportService;
+
+ private final AsterixFeedProperties asterixFeedProperties;
+
+ private final String nodeId;
+
+ private final int frameSize;
+
+ public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) throws AsterixException, HyracksDataException {
+ this.nodeId = nodeId;
+ this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
+ this.feedConnectionManager = new FeedConnectionManager(nodeId);
+ this.feedMetadataManager = new FeedMetadataManager(nodeId);
+ this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
+ String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null ? AsterixClusterProperties.INSTANCE
+ .getCluster().getMasterNode().getClusterIp() : "localhost";
+ this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
+ this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+ try {
+ this.feedMessageService.start();
+ this.nodeLoadReportService.start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start feed services " + e.getMessage());
+ }
+ e.printStackTrace();
+ }
+ this.feedMetricCollector = new FeedMetricCollector(nodeId);
+ this.frameSize = frameSize;
+ this.asterixFeedProperties = feedProperties;
+ }
+
+ @Override
+ public IFeedSubscriptionManager getFeedSubscriptionManager() {
+ return feedSubscriptionManager;
+ }
+
+ @Override
+ public IFeedConnectionManager getFeedConnectionManager() {
+ return feedConnectionManager;
+ }
+
+ @Override
+ public IFeedMemoryManager getFeedMemoryManager() {
+ return feedMemoryManager;
+ }
+
+ @Override
+ public IFeedMetricCollector getFeedMetricCollector() {
+ return feedMetricCollector;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ @Override
+ public IFeedMetadataManager getFeedMetadataManager() {
+ return feedMetadataManager;
+ }
+
+ @Override
+ public IFeedMessageService getFeedMessageService() {
+ return feedMessageService;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedManager " + "[" + nodeId + "]";
+ }
+
+ @Override
+ public AsterixFeedProperties getAsterixFeedProperties() {
+ return asterixFeedProperties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
new file mode 100644
index 0000000..8530370
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.MessageReceiver;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage.MessageType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.CentralFeedManager.AQLExecutor;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+
+public class FeedMessageReceiver extends MessageReceiver<String> {
+
+ private static boolean initialized;
+
+ private final IFeedLoadManager feedLoadManager;
+ private final IFeedTrackingManager feedTrackingManager;
+
+ public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
+ this.feedLoadManager = centralFeedManager.getFeedLoadManager();
+ this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
+ }
+
+ @Override
+ public void processMessage(String message) throws Exception {
+ JSONObject obj = new JSONObject(message);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received message " + obj);
+ }
+ MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
+ switch (messageType) {
+ case XAQL:
+ if (!initialized) {
+ FeedBootstrap.setUpInitialArtifacts();
+ initialized = true;
+ }
+ AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
+ break;
+ case CONGESTION:
+ feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
+ break;
+ case FEED_REPORT:
+ feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
+ break;
+ case NODE_REPORT:
+ feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
+ break;
+ case SCALE_IN_REQUEST:
+ feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
+ break;
+ case STORAGE_REPORT:
+ FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
+ break;
+ case COMMIT_ACK:
+ feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
+ break;
+ case THROTTLING_ENABLED:
+ feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
+ default:
+ break;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
new file mode 100644
index 0000000..b927c90
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+import edu.uci.ics.asterix.metadata.feeds.XAQLFeedMessage;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedMetadataManager implements IFeedMetadataManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
+
+ private final String nodeId;
+ private ARecordType recordType;
+
+ public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
+ this.nodeId = nodeId;
+ String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+ "timestamp" };
+ IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+ recordType = new ARecordType(FeedBootstrap.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
+ }
+
+ @Override
+ public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
+ throws AsterixException {
+ try {
+ AString id = new AString("1");
+ AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
+ AString feedValue = new AString(connectionId.getFeedId().getFeedName());
+ AString targetDatasetValue = new AString(connectionId.getDatasetName());
+ AString tupleValue = new AString(tuple);
+ AString messageValue = new AString(message);
+ AString dateTime = new AString(new Date().toString());
+
+ IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
+ messageValue, dateTime };
+ ARecord record = new ARecord(recordType, fields);
+ StringBuilder builder = new StringBuilder();
+ builder.append("use dataverse " + FeedBootstrap.FEEDS_METADATA_DV + ";" + "\n");
+ builder.append("insert into dataset " + FeedBootstrap.FAILED_TUPLE_DATASET + " ");
+ builder.append(" (" + recordToString(record) + ")");
+ builder.append(";");
+
+ XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
+ feedManager.getFeedMessageService().sendMessage(xAqlMessage);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Sent " + xAqlMessage.toJSON());
+ }
+ } catch (Exception pe) {
+ throw new AsterixException(pe);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FeedMetadataManager [" + nodeId + "]";
+ }
+
+ private String recordToString(ARecord record) {
+ String[] fieldNames = record.getType().getFieldNames();
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ ");
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append("\"" + fieldNames[i] + "\"");
+ sb.append(": ");
+ switch (record.getType().getFieldTypes()[i].getTypeTag()) {
+ case STRING:
+ sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
+ break;
+ }
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
new file mode 100644
index 0000000..3e5a1a4
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedTrackingManager implements IFeedTrackingManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
+
+ private final BitSet allOnes;
+
+ private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
+ private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
+
+ public FeedTrackingManager() {
+ byte[] allOneBytes = new byte[128];
+ Arrays.fill(allOneBytes, (byte) 0xff);
+ allOnes = BitSet.valueOf(allOneBytes);
+ ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
+ maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
+ }
+
+ @Override
+ public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
+ AckId ackId = getAckId(ackMessage);
+ Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
+ if (acksForConnection == null) {
+ acksForConnection = new HashMap<AckId, BitSet>();
+ acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
+ ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
+ }
+ BitSet currentAcks = acksForConnection.get(ackId);
+ if (currentAcks == null) {
+ currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
+ acksForConnection.put(ackId, currentAcks);
+ } else {
+ currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
+ }
+ if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
+ }
+ Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
+ if (maxBaseAckedForConnection == null) {
+ maxBaseAckedForConnection = new HashMap<AckId, Integer>();
+ maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
+ }
+ Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
+ if (maxBaseAckedValue == null) {
+ maxBaseAckedValue = ackMessage.getBase();
+ maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+ sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+ ackMessage.getBase());
+ } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
+ maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+ sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+ ackMessage.getBase());
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
+ }
+ }
+
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
+ }
+ }
+ }
+
+ public synchronized void disableTracking(FeedConnectionId connectionId) {
+ ackHistory.remove(connectionId);
+ maxBaseAcked.remove(connectionId);
+ }
+
+ private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
+ FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+ List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+ String collectLocation = collectLocations.get(partition);
+ Set<String> messageDestinations = new HashSet<String>();
+ messageDestinations.add(collectLocation);
+ messageDestinations.addAll(storageLocations);
+ try {
+ JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
+ CentralFeedManager.runJob(spec, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
+ }
+ }
+ }
+
+ private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
+ return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
+ }
+
+ private static class AckId {
+ private FeedConnectionId connectionId;
+ private int intakePartition;
+ private int base;
+
+ public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.base = base;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AckId)) {
+ return false;
+ }
+ AckId other = (AckId) o;
+ return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
+ && other.getBase() == base;
+ }
+
+ @Override
+ public String toString() {
+ return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ public int getBase() {
+ return base;
+ }
+
+ }
+
+ @Override
+ public void disableAcking(FeedConnectionId connectionId) {
+ ackHistory.remove(connectionId);
+ maxBaseAcked.remove(connectionId);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Acking disabled for " + connectionId);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..ba54406
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+ private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+ private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
+
+ public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+ this.inbox = inbox;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ IClusterManagementWorkResponse response = null;
+ try {
+ response = inbox.take();
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted exception " + e.getMessage());
+ }
+ }
+ IClusterManagementWork submittedWork = response.getWork();
+ Map<String, String> nodeSubstitution = new HashMap<String, String>();
+ switch (submittedWork.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
+ int workId = addNodeWork.getWorkId();
+ Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
+ AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+ List<String> nodesAdded = resp.getNodesAdded();
+ List<String> unsubstitutedNodes = new ArrayList<String>();
+ unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
+ int nodeIndex = 0;
+
+ /** form a mapping between the failed node and its substitute **/
+ if (nodesAdded != null && nodesAdded.size() > 0) {
+ for (String failedNodeId : addNodeWork.getDeadNodes()) {
+ String substitute = nodesAdded.get(nodeIndex);
+ nodeSubstitution.put(failedNodeId, substitute);
+ nodeIndex = (nodeIndex + 1) % nodesAdded.size();
+ unsubstitutedNodes.remove(failedNodeId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
+ }
+ }
+ }
+ if (unsubstitutedNodes.size() > 0) {
+ String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(
+ new String[] {});
+ nodeIndex = 0;
+ for (String unsubstitutedNode : unsubstitutedNodes) {
+ nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
+ + unsubstitutedNode);
+ }
+ nodeIndex = (nodeIndex + 1) % participantNodes.length;
+ }
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
+ }
+ }
+
+ // alter failed feed intake jobs
+
+ for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
+ String failedNode = entry.getKey();
+ List<FeedJobInfo> impactedJobInfos = entry.getValue();
+ for (FeedJobInfo info : impactedJobInfos) {
+ JobSpecification spec = info.getSpec();
+ replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
+ info.setSpec(spec);
+ }
+ }
+
+ Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
+ Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
+
+ for (List<FeedJobInfo> infos : failureAnalysis.values()) {
+ for (FeedJobInfo info : infos) {
+ switch (info.getJobType()) {
+ case INTAKE:
+ revisedIntakeJobs.add((FeedIntakeInfo) info);
+ break;
+ case FEED_CONNECT:
+ revisedConnectJobInfos.add((FeedConnectJobInfo) info);
+ break;
+ }
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ try {
+ for (FeedIntakeInfo info : revisedIntakeJobs) {
+ hcc.startJob(info.getSpec());
+ }
+ Thread.sleep(2000);
+ for (FeedConnectJobInfo info : revisedConnectJobInfos) {
+ hcc.startJob(info.getSpec());
+ Thread.sleep(2000);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start revised job post failure");
+ }
+ }
+
+ break;
+ case REMOVE_NODE:
+ throw new IllegalStateException("Invalid work submitted");
+ }
+ }
+ }
+
+ private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+ Set<Constraint> userConstraints = jobSpec.getUserConstraints();
+ List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+ List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+ List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+ Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+ OperatorDescriptorId opId = null;
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (modifiedOperators.contains(opId)) {
+ countConstraintsToReplace.add(constraint);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ if (oldLocation.equals(failedNodeId)) {
+ locationConstraintsToReplace.add(constraint);
+ modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+ Map<Integer, String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new HashMap<Integer, String>();
+ newConstraints.put(opId, newLocs);
+ }
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, replacementNode);
+ } else {
+ if (modifiedOperators.contains(opId)) {
+ locationConstraintsToReplace.add(constraint);
+ Map<Integer, String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new HashMap<Integer, String>();
+ newConstraints.put(opId, newLocs);
+ }
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, oldLocation);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ }
+ break;
+ }
+ }
+
+ jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+ jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+ for (OperatorDescriptorId mopId : modifiedOperators) {
+ List<Constraint> clist = candidateConstraints.get(mopId);
+ if (clist != null && !clist.isEmpty()) {
+ jobSpec.getUserConstraints().removeAll(clist);
+
+ for (Constraint c : clist) {
+ if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+ ConstraintExpression cexpr = c.getRValue();
+ int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ newConstraints.get(mopId).put(partition, oldLocation);
+ }
+ }
+ }
+ }
+
+ for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
+ OperatorDescriptorId nopId = entry.getKey();
+ Map<Integer, String> clist = entry.getValue();
+ IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+ String[] locations = new String[clist.size()];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = clist.get(i);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
+ }
+
+ }
+
+ public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
+ feedsWaitingForResponse.put(workId, impactedJobs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
new file mode 100644
index 0000000..0e7ec5b
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class FeedsActivator implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+ private List<FeedCollectInfo> feedsToRevive;
+ private Mode mode;
+
+ public enum Mode {
+ REVIVAL_POST_CLUSTER_REBOOT,
+ REVIVAL_POST_NODE_REJOIN
+ }
+
+ public FeedsActivator() {
+ this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+ }
+
+ public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
+ this.feedsToRevive = feedsToRevive;
+ this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+ }
+
+ @Override
+ public void run() {
+ switch (mode) {
+ case REVIVAL_POST_CLUSTER_REBOOT:
+ //revivePostClusterReboot();
+ break;
+ case REVIVAL_POST_NODE_REJOIN:
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempt to resume feed interrupted");
+ }
+ throw new IllegalStateException(e1.getMessage());
+ }
+ for (FeedCollectInfo finfo : feedsToRevive) {
+ try {
+ JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+ LOGGER.info("Job:" + finfo.jobSpec);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+ PrintWriter writer = new PrintWriter(System.out, true);
+ SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+ try {
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+ ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
+ new Identifier(dataset), feedPolicy, 0);
+ stmt.setForceConnect(true);
+ List<Statement> statements = new ArrayList<Statement>();
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, pc);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+ + feedPolicy + " Exception " + e.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
new file mode 100644
index 0000000..d476eed
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.file;
+
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class DatasetOperations {
+
+ private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+
+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
+ ACIDException, AsterixException {
+
+ String dataverseName = null;
+ if (datasetDropStmt.getDataverseName() != null) {
+ dataverseName = datasetDropStmt.getDataverseName();
+ } else if (metadataProvider.getDefaultDataverse() != null) {
+ dataverseName = metadataProvider.getDefaultDataverse().getDataverseName();
+ }
+
+ String datasetName = datasetDropStmt.getDatasetName();
+ String datasetPath = dataverseName + File.separator + datasetName;
+
+ LOGGER.info("DROP DATASETPATH: " + datasetPath);
+
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+ }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ return JobSpecificationUtils.createJobSpecification();
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+
+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+ dataverseName);
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+
+ ARecordType itemType = (ARecordType) metadataProvider.findType(dataverseName, dataset.getItemTypeName());
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+ JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
+ temp);
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadataProvider.getMetadataTxnContext());
+
+ // The index drop operation should be persistent regardless of temp datasets or permanent dataset
+ IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, true));
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+ splitsAndConstraint.second);
+
+ specPrimary.addRoot(primaryBtreeDrop);
+
+ return specPrimary;
+ }
+
+ public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+ AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+ String dataverseName = dataverse.getDataverseName();
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ ARecordType itemType = (ARecordType) metadata.findType(dataverseName, dataset.getItemTypeName());
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+ int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+ FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < fs.length; i++) {
+ sb.append(stringOf(fs[i]) + " ");
+ }
+ LOGGER.info("CREATING File Splits: " + sb.toString());
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadata.getMetadataTxnContext());
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+ ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
+ comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
+ compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+ ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+ localResourceMetadata, LocalResource.LSMBTreeResource);
+
+ // The index create operation should be persistent regardless of temp datasets or permanent dataset
+ TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(dataset
+ .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+ .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, true), localResourceFactoryProvider,
+ NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
+ splitsAndConstraint.second);
+ spec.addRoot(indexCreateOp);
+ return spec;
+ }
+
+ private static String stringOf(FileSplit fs) {
+ return fs.getNodeName() + ":" + fs.getLocalFile().toString();
+ }
+
+ public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
+ AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+ String dataverseName = dataverse.getDataverseName();
+ IDataFormat format;
+ try {
+ format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+
+ ARecordType itemType = (ARecordType) metadata.findType(dataverseName, dataset.getItemTypeName());
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+ int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, format.getBinaryComparatorFactoryProvider());
+ int[] filterFields = DatasetUtils.createFilterFields(dataset);
+ int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+ metadata.getMetadataTxnContext());
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, !temp), NoOpOperationCallbackFactory.INSTANCE);
+ AlgebricksPartitionConstraintHelper
+ .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+
+ AlgebricksPartitionConstraintHelper
+ .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+ spec.addRoot(compactOp);
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
new file mode 100644
index 0000000..997893f
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseOperations {
+ public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
+ JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+ jobSpec.addRoot(frod);
+ return jobSpec;
+ }
+}