You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:18 UTC
[21/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
new file mode 100644
index 0000000..bd0c09b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+import edu.uci.ics.asterix.metadata.feeds.XAQLFeedMessage;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedMetadataManager implements IFeedMetadataManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
+
+ private final String nodeId;
+ private ARecordType recordType;
+
+ public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
+ this.nodeId = nodeId;
+ String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+ "timestamp" };
+ IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+ recordType = new ARecordType(FeedBootstrap.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
+ }
+
+ @Override
+ public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
+ throws AsterixException {
+ try {
+ AString id = new AString("1");
+ AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
+ AString feedValue = new AString(connectionId.getFeedId().getFeedName());
+ AString targetDatasetValue = new AString(connectionId.getDatasetName());
+ AString tupleValue = new AString(tuple);
+ AString messageValue = new AString(message);
+ AString dateTime = new AString(new Date().toString());
+
+ IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
+ messageValue, dateTime };
+ ARecord record = new ARecord(recordType, fields);
+ StringBuilder builder = new StringBuilder();
+ builder.append("use dataverse " + FeedBootstrap.FEEDS_METADATA_DV + ";" + "\n");
+ builder.append("insert into dataset " + FeedBootstrap.FAILED_TUPLE_DATASET + " ");
+ builder.append(" (" + recordToString(record) + ")");
+ builder.append(";");
+
+ XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
+ feedManager.getFeedMessageService().sendMessage(xAqlMessage);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Sent " + xAqlMessage.toJSON());
+ }
+ } catch (Exception pe) {
+ throw new AsterixException(pe);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FeedMetadataManager [" + nodeId + "]";
+ }
+
+ private String recordToString(ARecord record) {
+ String[] fieldNames = record.getType().getFieldNames();
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ ");
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append("\"" + fieldNames[i] + "\"");
+ sb.append(": ");
+ switch (record.getType().getFieldTypes()[i].getTypeTag()) {
+ case STRING:
+ sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
+ break;
+ }
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
new file mode 100644
index 0000000..c55c132
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedTrackingManager implements IFeedTrackingManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
+
+ private final BitSet allOnes;
+
+ private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
+ private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
+
+ public FeedTrackingManager() {
+ byte[] allOneBytes = new byte[128];
+ Arrays.fill(allOneBytes, (byte) 0xff);
+ allOnes = BitSet.valueOf(allOneBytes);
+ ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
+ maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
+ }
+
+ @Override
+ public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
+ AckId ackId = getAckId(ackMessage);
+ Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
+ if (acksForConnection == null) {
+ acksForConnection = new HashMap<AckId, BitSet>();
+ acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
+ ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
+ }
+ BitSet currentAcks = acksForConnection.get(ackId);
+ if (currentAcks == null) {
+ currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
+ acksForConnection.put(ackId, currentAcks);
+ } else {
+ currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
+ }
+ if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
+ }
+ Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
+ if (maxBaseAckedForConnection == null) {
+ maxBaseAckedForConnection = new HashMap<AckId, Integer>();
+ maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
+ }
+ Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
+ if (maxBaseAckedValue == null) {
+ maxBaseAckedValue = ackMessage.getBase();
+ maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+ sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+ ackMessage.getBase());
+ } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
+ maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+ sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+ ackMessage.getBase());
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
+ }
+ }
+
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
+ }
+ }
+ }
+
+ public synchronized void disableTracking(FeedConnectionId connectionId) {
+ ackHistory.remove(connectionId);
+ maxBaseAcked.remove(connectionId);
+ }
+
+ private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
+ FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
+ List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+ List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+ String collectLocation = collectLocations.get(partition);
+ Set<String> messageDestinations = new HashSet<String>();
+ messageDestinations.add(collectLocation);
+ messageDestinations.addAll(storageLocations);
+ try {
+ JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
+ CentralFeedManager.runJob(spec, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
+ }
+ }
+ }
+
+ private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
+ return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
+ }
+
+ private static class AckId {
+ private FeedConnectionId connectionId;
+ private int intakePartition;
+ private int base;
+
+ public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.base = base;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AckId)) {
+ return false;
+ }
+ AckId other = (AckId) o;
+ return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
+ && other.getBase() == base;
+ }
+
+ @Override
+ public String toString() {
+ return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ public int getBase() {
+ return base;
+ }
+
+ }
+
+ @Override
+ public void disableAcking(FeedConnectionId connectionId) {
+ ackHistory.remove(connectionId);
+ maxBaseAcked.remove(connectionId);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Acking disabled for " + connectionId);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..834784c
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+ private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+ private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
+
+ public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+ this.inbox = inbox;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ IClusterManagementWorkResponse response = null;
+ try {
+ response = inbox.take();
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Interrupted exception " + e.getMessage());
+ }
+ }
+ IClusterManagementWork submittedWork = response.getWork();
+ Map<String, String> nodeSubstitution = new HashMap<String, String>();
+ switch (submittedWork.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
+ int workId = addNodeWork.getWorkId();
+ Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
+ AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+ List<String> nodesAdded = resp.getNodesAdded();
+ List<String> unsubstitutedNodes = new ArrayList<String>();
+ unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
+ int nodeIndex = 0;
+
+ /** form a mapping between the failed node and its substitute **/
+ if (nodesAdded != null && nodesAdded.size() > 0) {
+ for (String failedNodeId : addNodeWork.getDeadNodes()) {
+ String substitute = nodesAdded.get(nodeIndex);
+ nodeSubstitution.put(failedNodeId, substitute);
+ nodeIndex = (nodeIndex + 1) % nodesAdded.size();
+ unsubstitutedNodes.remove(failedNodeId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
+ }
+ }
+ }
+ if (unsubstitutedNodes.size() > 0) {
+ String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(
+ new String[] {});
+ nodeIndex = 0;
+ for (String unsubstitutedNode : unsubstitutedNodes) {
+ nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
+ + unsubstitutedNode);
+ }
+ nodeIndex = (nodeIndex + 1) % participantNodes.length;
+ }
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
+ }
+ }
+
+ // alter failed feed intake jobs
+
+ for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
+ String failedNode = entry.getKey();
+ List<FeedJobInfo> impactedJobInfos = entry.getValue();
+ for (FeedJobInfo info : impactedJobInfos) {
+ JobSpecification spec = info.getSpec();
+ replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
+ info.setSpec(spec);
+ }
+ }
+
+ Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
+ Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
+
+ for (List<FeedJobInfo> infos : failureAnalysis.values()) {
+ for (FeedJobInfo info : infos) {
+ switch (info.getJobType()) {
+ case INTAKE:
+ revisedIntakeJobs.add((FeedIntakeInfo) info);
+ break;
+ case FEED_CONNECT:
+ revisedConnectJobInfos.add((FeedConnectJobInfo) info);
+ break;
+ }
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ try {
+ for (FeedIntakeInfo info : revisedIntakeJobs) {
+ hcc.startJob(info.getSpec());
+ }
+ Thread.sleep(2000);
+ for (FeedConnectJobInfo info : revisedConnectJobInfos) {
+ hcc.startJob(info.getSpec());
+ Thread.sleep(2000);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start revised job post failure");
+ }
+ }
+
+ break;
+ case REMOVE_NODE:
+ throw new IllegalStateException("Invalid work submitted");
+ }
+ }
+ }
+
+ private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+ Set<Constraint> userConstraints = jobSpec.getUserConstraints();
+ List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+ List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+ List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+ Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+ OperatorDescriptorId opId = null;
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (modifiedOperators.contains(opId)) {
+ countConstraintsToReplace.add(constraint);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ if (oldLocation.equals(failedNodeId)) {
+ locationConstraintsToReplace.add(constraint);
+ modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+ Map<Integer, String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new HashMap<Integer, String>();
+ newConstraints.put(opId, newLocs);
+ }
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, replacementNode);
+ } else {
+ if (modifiedOperators.contains(opId)) {
+ locationConstraintsToReplace.add(constraint);
+ Map<Integer, String> newLocs = newConstraints.get(opId);
+ if (newLocs == null) {
+ newLocs = new HashMap<Integer, String>();
+ newConstraints.put(opId, newLocs);
+ }
+ int partition = ((PartitionLocationExpression) lexpr).getPartition();
+ newLocs.put(partition, oldLocation);
+ } else {
+ List<Constraint> clist = candidateConstraints.get(opId);
+ if (clist == null) {
+ clist = new ArrayList<Constraint>();
+ candidateConstraints.put(opId, clist);
+ }
+ clist.add(constraint);
+ }
+ }
+ break;
+ }
+ }
+
+ jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+ jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+ for (OperatorDescriptorId mopId : modifiedOperators) {
+ List<Constraint> clist = candidateConstraints.get(mopId);
+ if (clist != null && !clist.isEmpty()) {
+ jobSpec.getUserConstraints().removeAll(clist);
+
+ for (Constraint c : clist) {
+ if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+ ConstraintExpression cexpr = c.getRValue();
+ int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ newConstraints.get(mopId).put(partition, oldLocation);
+ }
+ }
+ }
+ }
+
+ for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
+ OperatorDescriptorId nopId = entry.getKey();
+ Map<Integer, String> clist = entry.getValue();
+ IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+ String[] locations = new String[clist.size()];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = clist.get(i);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
+ }
+
+ }
+
+ public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
+ feedsWaitingForResponse.put(workId, impactedJobs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
new file mode 100644
index 0000000..4b5c8ef
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
@@ -0,0 +1,95 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class FeedsActivator implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+ private List<FeedCollectInfo> feedsToRevive;
+ private Mode mode;
+
+ public enum Mode {
+ REVIVAL_POST_CLUSTER_REBOOT,
+ REVIVAL_POST_NODE_REJOIN
+ }
+
+ public FeedsActivator() {
+ this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+ }
+
+ public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
+ this.feedsToRevive = feedsToRevive;
+ this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+ }
+
+ @Override
+ public void run() {
+ switch (mode) {
+ case REVIVAL_POST_CLUSTER_REBOOT:
+ //revivePostClusterReboot();
+ break;
+ case REVIVAL_POST_NODE_REJOIN:
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempt to resume feed interrupted");
+ }
+ throw new IllegalStateException(e1.getMessage());
+ }
+ for (FeedCollectInfo finfo : feedsToRevive) {
+ try {
+ JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+ LOGGER.info("Job:" + finfo.jobSpec);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+ PrintWriter writer = new PrintWriter(System.out, true);
+ SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+ try {
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+ ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
+ new Identifier(dataset), feedPolicy, 0);
+ stmt.setForceConnect(true);
+ List<Statement> statements = new ArrayList<Statement>();
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, pc);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+ + feedPolicy + " Exception " + e.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 5e52f3e..28ee1f2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -14,15 +14,34 @@
*/
package edu.uci.ics.asterix.file;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.List;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.message.EndFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -33,44 +52,200 @@ import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
*/
public class FeedOperations {
- private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
-
/**
- * @param controlFeedStatement
- * The statement representing the action that describes the
- * action that needs to be taken on the feed. E.g. of actions are
- * stop feed or alter feed.
+ * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
+ *
+ * @param primaryFeed
* @param metadataProvider
- * An instance of the MetadataProvider
- * @return An instance of JobSpec for the job that would send an appropriate
- * control message to the running feed.
- * @throws AsterixException
- * @throws AlgebricksException
+ * @return JobSpecification the Hyracks job specification for receiving data from external source
+ * @throws Exception
*/
- public static JobSpecification buildDisconnectFeedJobSpec(String dataverseName, String feedName,
- String datasetName, AqlMetadataProvider metadataProvider, FeedActivity feedActivity)
- throws AsterixException, AlgebricksException {
+ public static Pair<JobSpecification, IFeedAdapterFactory> buildFeedIntakeJobSpec(PrimaryFeed primaryFeed,
+ AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IOperatorDescriptor feedMessenger;
- AlgebricksPartitionConstraint messengerPc;
+ spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+ IFeedAdapterFactory adapterFactory = null;
+ IOperatorDescriptor feedIngestor;
+ AlgebricksPartitionConstraint ingesterPc;
try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider
- .buildDisconnectFeedMessengerRuntime(spec, dataverseName, feedName, datasetName, feedActivity);
- feedMessenger = p.first;
- messengerPc = p.second;
+ Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> t = metadataProvider
+ .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
+ feedIngestor = t.first;
+ ingesterPc = t.second;
+ adapterFactory = t.third;
} catch (AlgebricksException e) {
+ e.printStackTrace();
throw new AsterixException(e);
}
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+ return new Pair<JobSpecification, IFeedAdapterFactory>(spec, adapterFactory);
+ }
+
+ public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
+ throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor feedMessenger = null;
+ AlgebricksPartitionConstraint messengerPc = null;
+
+ List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
+ locations);
+
+ feedMessenger = p.first;
+ messengerPc = p.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
spec.addRoot(nullSink);
+
return spec;
+ }
+
+ /**
+ * Builds the job spec for sending message to an active feed to disconnect it from the
+ * its source.
+ */
+ public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(AqlMetadataProvider metadataProvider,
+ FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor feedMessenger;
+ AlgebricksPartitionConstraint messengerPc;
+ List<String> locations = null;
+ FeedRuntimeType sourceRuntimeType;
+ try {
+ FeedConnectJobInfo cInfo = FeedLifecycleListener.INSTANCE.getFeedConnectJobInfo(connectionId);
+ IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
+ IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
+
+ boolean terminateIntakeJob = false;
+ boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
+ if (completeDisconnect) {
+ sourceRuntimeType = FeedRuntimeType.INTAKE;
+ locations = cInfo.getCollectLocations();
+ terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
+ } else {
+ locations = cInfo.getComputeLocations();
+ sourceRuntimeType = FeedRuntimeType.COMPUTE;
+ }
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
+ connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
+
+ feedMessenger = p.first;
+ messengerPc = p.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+ return new Pair<JobSpecification, Boolean>(spec, terminateIntakeJob);
+
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ }
+
+ public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
+ Collection<String> collectLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
+ ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
+ List<String> collectLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
+ Collection<String> targetLocations) throws AsterixException {
+ JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+ messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
+ buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+ return messageJobSpec;
+ }
+
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
+ JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
+ FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
+ IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
+ feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
+ return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
+ }
+
+ private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
+ JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
+ Collection<String> locations) throws AlgebricksException {
+ AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ locations.toArray(new String[] {}));
+ FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId,
+ feedMessage);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
+ }
+
+ private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
+ AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
+ messengerPc);
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
+ messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
+ messageJobSpec.addRoot(nullSink);
+ return messageJobSpec;
+ }
+ private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
+ JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
+ FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
+ throws AlgebricksException {
+ IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
+ completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
+ return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index 0af6a7e..8e633a9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -19,16 +19,17 @@ import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
import edu.uci.ics.asterix.file.ExternalIndexingOperations;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
@@ -36,14 +37,14 @@ import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
- private static State state;
+ private static ClusterState state;
private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
private HyracksConnection hcc;
public static AsterixGlobalRecoveryManager INSTANCE;
@@ -63,8 +64,8 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
@Override
public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
// perform global recovery if state changed to active
- final State newState = AsterixClusterProperties.INSTANCE.getState();
- boolean needToRecover = !newState.equals(state) && (newState == State.ACTIVE);
+ final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+ boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
if (needToRecover) {
Thread recoveryThread = new Thread(new Runnable() {
@Override
@@ -79,7 +80,7 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
for (Dataverse dataverse : dataverses) {
if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse);
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
dataverse.getDataverseName());
for (Dataset dataset : datasets) {
@@ -206,7 +207,7 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
}
@Override
- public void notifyStateChange(State previousState, State newState) {
+ public void notifyStateChange(ClusterState previousState, ClusterState newState) {
// Do nothing?
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 9fa9a76..6fcc248 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -26,8 +26,6 @@ import edu.uci.ics.asterix.api.http.servlet.APIServlet;
import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.ConnectorAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
-import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
-import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
@@ -36,6 +34,9 @@ import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
@@ -55,6 +56,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private Server webServer;
private Server jsonAPIServer;
private Server feedServer;
+ private ICentralFeedManager centralFeedManager;
private static IAsterixStateProxy proxy;
private ICCApplicationContext appCtx;
@@ -89,6 +91,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
setupFeedServer(externalProperties);
feedServer.start();
+ centralFeedManager = CentralFeedManager.getInstance();
+ centralFeedManager.start();
waitUntilServerStart(webServer);
waitUntilServerStart(jsonAPIServer);
@@ -171,9 +175,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
feedServer.setHandler(context);
context.addServlet(new ServletHolder(new FeedServlet()), "/");
- context.addServlet(new ServletHolder(new FeedDashboardServlet()), "/feed/dashboard");
- context.addServlet(new ServletHolder(new FeedDataProviderServlet()), "/feed/data");
-
+
// add paths here
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 950afe4..c8f778d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -23,15 +23,15 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse.Status;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWorkResponse;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -141,8 +141,8 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
for (IClusterManagementWork w : workSet) {
switch (w.getClusterManagementWorkType()) {
case ADD_NODE:
- if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
- nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+ if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+ nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
}
nodeAdditionRequests.add((AddNodeWork) w);
break;
@@ -181,7 +181,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
}
for (AddNodeWork w : nodeAdditionRequests) {
- int n = w.getNumberOfNodes();
+ int n = w.getNumberOfNodesRequested();
List<String> nodesToBeAddedForWork = new ArrayList<String>();
for (int i = 0; i < n && i < addedNodes.size(); i++) {
nodesToBeAddedForWork.add(addedNodes.get(i));
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 77581c7..e05f406 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -20,9 +20,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
@@ -50,8 +50,8 @@ public class ClusterWorkExecutor implements Runnable {
for (IClusterManagementWork w : workSet) {
switch (w.getClusterManagementWorkType()) {
case ADD_NODE:
- if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
- nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+ if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+ nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
}
nodeAdditionRequests.add(w);
break;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index d2f3345..62dca1f 100755
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -124,9 +124,8 @@ public class ExternalLibraryBootstrap {
List<edu.uci.ics.asterix.metadata.entities.DatasourceAdapter> adapters = MetadataManager.INSTANCE
.getDataverseAdapters(mdTxnCtx, dataverse);
for (edu.uci.ics.asterix.metadata.entities.DatasourceAdapter adapter : adapters) {
- if (adapter.getAdapterIdentifier().getAdapterName().startsWith(libraryName + "#")) {
- MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier()
- .getAdapterName());
+ if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
+ MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName());
}
}
@@ -145,11 +144,12 @@ public class ExternalLibraryBootstrap {
private static void installLibraryIfNeeded(String dataverse, final File libraryDir,
Map<String, List<String>> uninstalledLibs) throws Exception {
- String libraryName = libraryDir.getName();
+ String libraryName = libraryDir.getName().trim();
List<String> uninstalledLibsInDv = uninstalledLibs.get(dataverse);
boolean wasUninstalled = uninstalledLibsInDv != null && uninstalledLibsInDv.contains(libraryName);
MetadataTransactionContext mdTxnCtx = null;
+ MetadataManager.INSTANCE.acquireWriteLatch();
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
edu.uci.ics.asterix.metadata.entities.Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(
@@ -168,7 +168,7 @@ public class ExternalLibraryBootstrap {
ExternalLibrary library = getLibrary(new File(libraryDir + File.separator + libraryDescriptors[0]));
if (libraryDescriptors.length == 0) {
- throw new Exception("No library descriptors defined");
+ throw new Exception("No library descriptor defined");
} else if (libraryDescriptors.length > 1) {
throw new Exception("More than 1 library descriptors defined");
}
@@ -186,12 +186,12 @@ public class ExternalLibraryBootstrap {
args.add(arg);
}
edu.uci.ics.asterix.metadata.entities.Function f = new edu.uci.ics.asterix.metadata.entities.Function(
- dataverse, libraryName + "#" + function.getName(), args.size(), args,
- function.getReturnType(), function.getDefinition(), library.getLanguage(),
- function.getFunctionType());
+ dataverse, libraryName + "#" + function.getName().trim(), args.size(), args, function
+ .getReturnType().trim(), function.getDefinition().trim(), library.getLanguage()
+ .trim(), function.getFunctionType().trim());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, f);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Installed function: " + libraryName + "#" + function.getName());
+ LOGGER.info("Installed function: " + libraryName + "#" + function.getName().trim());
}
}
}
@@ -202,8 +202,8 @@ public class ExternalLibraryBootstrap {
if (library.getLibraryAdapters() != null) {
for (LibraryAdapter adapter : library.getLibraryAdapters().getLibraryAdapter()) {
- String adapterFactoryClass = adapter.getFactoryClass();
- String adapterName = libraryName + "#" + adapter.getName();
+ String adapterFactoryClass = adapter.getFactoryClass().trim();
+ String adapterName = libraryName + "#" + adapter.getName().trim();
AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
@@ -231,6 +231,8 @@ public class ExternalLibraryBootstrap {
LOGGER.info("Exception in installing library " + libraryName);
}
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ } finally {
+ MetadataManager.INSTANCE.releaseWriteLatch();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
new file mode 100644
index 0000000..497281b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class FeedBootstrap {
+
+ public final static String FEEDS_METADATA_DV = "feeds_metadata";
+ public final static String FAILED_TUPLE_DATASET = "failed_tuple";
+ public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
+ public final static String FAILED_TUPLE_DATASET_KEY = "id";
+
+ public static void setUpInitialArtifacts() throws Exception {
+
+ StringBuilder builder = new StringBuilder();
+ try {
+ builder.append("create dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+ builder.append("use dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+
+ builder.append("create type " + FAILED_TUPLE_DATASET_TYPE + " as open { ");
+
+ String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple",
+ "message", "timestamp" };
+ IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i > 0) {
+ builder.append(",");
+ }
+ builder.append(fieldNames[i] + ":");
+ builder.append(fieldTypes[i].getTypeName());
+ }
+ builder.append("}" + ";" + "\n");
+
+ builder.append("create dataset " + FAILED_TUPLE_DATASET + " " + "(" + FAILED_TUPLE_DATASET_TYPE + ")" + " "
+ + "primary key " + FAILED_TUPLE_DATASET_KEY + " on " + MetadataConstants.METADATA_NODEGROUP_NAME
+ + ";");
+
+ CentralFeedManager.AQLExecutor.executeAQL(builder.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Error: " + builder.toString());
+ throw e;
+ }
+ }
+
+}