You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:01 UTC
[04/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 6380269..09bc5c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,40 +14,66 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedCollectRuntimeInputHandler;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.FeedId;
import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.IntakePartitionStatistics;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import edu.uci.ics.asterix.common.feeds.StorageSideMonitoredBuffer;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.message.EndFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
- * Runtime for the @see{FeedMessageOperatorDescriptor}
+ * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ * a feed message to the local feed manager on the host node controller.
+ *
+ * @see FeedMessageOperatorDescriptor
+ * IFeedMessage
+ * IFeedManager
*/
public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
- private final FeedConnectionId feedId;
- private final IFeedMessage feedMessage;
- private final int partition;
- private final IHyracksTaskContext ctx;
+ private final FeedConnectionId connectionId;
+ private final IFeedMessage message;
private final IFeedManager feedManager;
+ private final int partition;
- public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedMessage feedMessage,
- int partition, int nPartitions) {
- this.feedId = feedId;
- this.feedMessage = feedMessage;
+ public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
+ IFeedMessage feedMessage, int partition, int nPartitions) {
+ this.connectionId = connectionId;
+ this.message = feedMessage;
this.partition = partition;
- this.ctx = ctx;
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.feedManager = runtimeCtx.getFeedManager();
@@ -57,47 +83,38 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
public void initialize() throws HyracksDataException {
try {
writer.open();
- FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
- FeedRuntime feedRuntime = feedManager.getFeedRuntime(runtimeId);
- boolean ingestionLocation = feedRuntime != null;
-
- switch (feedMessage.getMessageType()) {
+ switch (message.getMessageType()) {
case END:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending feed:" + feedId);
- }
-
- if (ingestionLocation) {
- AdapterRuntimeManager adapterRuntimeMgr = ((IngestionRuntime) feedRuntime)
- .getAdapterRuntimeManager();
- adapterRuntimeMgr.stop();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Terminating ingestion for :" + feedId);
- }
+ EndFeedMessage endFeedMessage = (EndFeedMessage) message;
+ switch (endFeedMessage.getEndMessageType()) {
+ case DISCONNECT_FEED:
+ hanldeDisconnectFeedTypeMessage(endFeedMessage);
+ break;
+ case DISCONTINUE_SOURCE:
+ handleDiscontinueFeedTypeMessage(endFeedMessage);
+ break;
}
break;
-
- case SUPER_FEED_MANAGER_ELECT:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registering Supers Feed Manager for :" + feedId);
- }
- FeedManagerElectMessage mesg = ((FeedManagerElectMessage) feedMessage);
- SuperFeedManager sfm = new SuperFeedManager(mesg.getFeedId(), mesg.getHost(), mesg.getNodeId(),
- mesg.getPort(), feedManager);
- synchronized (feedManager) {
- INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
- String nodeId = ncCtx.getNodeId();
- if (sfm.getNodeId().equals(nodeId)) {
- sfm.setLocal(true);
- } else {
- Thread.sleep(5000);
- }
- feedManager.registerSuperFeedManager(feedId, sfm);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered super feed mgr " + sfm + " for feed " + feedId);
- }
- }
+ case PREPARE_STALL: {
+ handlePrepareStallMessage((PrepareStallMessage) message);
+ break;
+ }
+ case TERMINATE_FLOW: {
+ FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
+ handleTerminateFlowMessage(connectionId);
+ break;
+ }
+ case COMMIT_ACK_RESPONSE: {
+ handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
+ break;
+ }
+ case THROTTLING_ENABLED: {
+ handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
+ break;
+ }
+ default:
break;
+
}
} catch (Exception e) {
@@ -106,4 +123,172 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
writer.close();
}
}
+
+ private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
+ FeedConnectionId connectionId = throttlingMessage.getConnectionId();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : runtimes) {
+ if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
+ FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
+ ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
+ + connectionId);
+ }
+ }
+ }
+ }
+
+ private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
+ FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : runtimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COLLECT:
+ FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
+ .getInputHandler();
+ int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
+ inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
+ break;
+ case STORE:
+ MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
+ .getStorageTimeTrackingRateTask();
+ sTask.receiveCommitAckResponse(commitResponseMessage);
+ break;
+ }
+ }
+
+ commitResponseMessage.getIntakePartition();
+ SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
+ partition);
+ IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(sid);
+ if (ingestionRuntime != null) {
+ IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
+ if (tracker != null) {
+ tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
+ }
+ }
+ }
+
+ private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+
+ boolean found = false;
+ for (FeedRuntimeId runtimeId : feedRuntimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
+ ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
+ found = true;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switched " + runtime + " to Hand Over stage");
+ }
+ }
+ }
+ if (!found) {
+ throw new HyracksDataException("COLLECT Runtime not found!");
+ }
+ }
+
+ private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
+ FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
+ int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : feedRuntimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COMPUTE:
+ Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
+ : Mode.END;
+ runtime.setMode(requiredMode);
+ break;
+ default:
+ runtime.setMode(Mode.STALL);
+ break;
+ }
+ }
+ }
+
+ private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+ FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
+ SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.INTAKE, partition);
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
+ subscribableRuntimeId);
+ IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
+ adapterRuntimeManager.stop();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
+ }
+ }
+
+ private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
+ }
+ FeedRuntimeId runtimeId = null;
+ FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
+ if (endFeedMessage.isCompleteDisconnection()) {
+ // subscribableRuntimeType represents the location at which the feed connection receives data
+ FeedRuntimeType runtimeType = null;
+ switch (subscribableRuntimeType) {
+ case INTAKE:
+ runtimeType = FeedRuntimeType.COLLECT;
+ break;
+ case COMPUTE:
+ runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
+ break;
+ default:
+ throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
+ }
+
+ runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+ CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(
+ connectionId, runtimeId);
+ feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
+ }
+ } else {
+ // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+ switch (subscribableRuntimeType) {
+ case INTAKE:
+ // illegal state as data hand-off from one feed to another does not happen at intake
+ throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType);
+ case COMPUTE:
+ // feed could be primary or secondary, doesn't matter
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
+ connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
+ feedSubscribableRuntimeId);
+ DistributeFeedFrameWriter dWriter = (DistributeFeedFrameWriter) feedRuntime.getFeedFrameWriter();
+ Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
+
+ IFrameWriter unsubscribingWriter = null;
+ for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
+ IFrameWriter frameWriter = entry.getKey();
+ FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
+ if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
+ unsubscribingWriter = feedFrameWriter;
+ dWriter.unsubscribeFeed(unsubscribingWriter);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
+ }
+ break;
+ }
+ }
+ break;
+ }
+
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unsubscribed from feed :" + connectionId);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
new file mode 100644
index 0000000..371e5b6
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.SubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dynamic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ private int nPartitions;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
+
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+ int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getFeedManager();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = runtimeCtx.getFeedManager();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+ writer.open();
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
+ nPartitions);
+
+ DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
+ runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+ coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+
+ feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
+ recordDesc);
+ feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+
+ distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.inputSideHandler.setCoreOperator(coreOperator);
+
+ DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
+ runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+ coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+ distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+
+ inputSideHandler.reset(nPartitions);
+ feedRuntime.setMode(Mode.PROCESS);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+ boolean end = inputSideHandler.getMode().equals(Mode.END);
+ try {
+ if (inputSideHandler != null) {
+ if (!(stalled || end)) {
+ inputSideHandler.nextFrame(null); // signal end of data
+ while (!inputSideHandler.isFinished()) {
+ synchronized (coreOperator) {
+ coreOperator.wait();
+ }
+ }
+ } else {
+ inputSideHandler.setFinished(true);
+ }
+ }
+ coreOperator.close();
+ System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (!stalled) {
+ deregister();
+ System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ } else {
+ System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ }
+ if (inputSideHandler != null) {
+ inputSideHandler.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+ private void deregister() {
+ if (feedRuntime != null) {
+ // deregister from subscription manager
+ SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
+ feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
+
+ // deregister from connection manager
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+ ((FeedRuntime) feedRuntime).getRuntimeId());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
new file mode 100644
index 0000000..d348cfc
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
@@ -0,0 +1,170 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dyanmic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ /** Total number of partitions available **/
+ private int nPartitions;
+
+ /** Type associated with the core feed operator **/
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final String operandId;
+
+ /** The pre-processor associated with this runtime **/
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getFeedManager();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = runtimeCtx.getFeedManager();
+ this.operandId = operationId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId,
+ (AbstractUnaryInputUnaryOutputOperatorNodePushable) coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager,
+ nPartitions);
+
+ setupBasicRuntime(inputSideHandler);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.fta = new FrameTupleAccessor(recordDesc);
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ feedRuntime.setMode(Mode.PROCESS);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
+ }
+ }
+
+ private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ coreOperator.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // ignore
+ } finally {
+ if (inputSideHandler != null) {
+ inputSideHandler.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index d636e04..953853d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,75 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.feeds;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Map;
-import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeState;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
/**
* FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
- * environment for an hyracks operator that is part of a feed ingestion pipeline.
- * The MetaFeed operator provides an interface iden- tical to that offered by the
- * underlying wrapped operator, hereafter referred to as the core operator.
- * As seen by Hyracks, the altered pipeline is identical to the earlier version formed
- * from core operators. The MetaFeed operator enhances each core operator by providing
- * functionality for handling runtime exceptions, saving any state for future retrieval,
- * and measuring/reporting of performance characteristics. We next describe how the added
- * functionality contributes to providing fault- tolerance.
+ * environment for an hyracks operator that is part of a feed ingestion
+ * pipeline. The MetaFeed operator provides an interface iden- tical to that
+ * offered by the underlying wrapped operator, hereafter referred to as the core
+ * operator. As seen by Hyracks, the altered pipeline is identical to the
+ * earlier version formed from core operators. The MetaFeed operator enhances
+ * each core operator by providing functionality for handling runtime
+ * exceptions, saving any state for future retrieval, and measuring/reporting of
+ * performance characteristics. We next describe how the added functionality
+ * contributes to providing fault- tolerance.
*/
public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
-
- /** The actual (Hyracks) operator that is wrapped around by the Metafeed Adapter **/
+ /**
+ * The actual (Hyracks) operator that is wrapped around by the MetaFeed
+ * operator.
+ **/
private IOperatorDescriptor coreOperator;
/**
- * A unique identifier for the feed instance. A feed instance represents the flow of data
- * from a feed to a dataset.
+ * A unique identifier for the feed instance. A feed instance represents the
+ * flow of data from a feed to a dataset.
**/
private final FeedConnectionId feedConnectionId;
/**
* The policy associated with the feed instance.
- */
- private final FeedPolicy feedPolicy;
+ **/
+ private final Map<String, String> feedPolicyProperties;
/**
* type for the feed runtime associated with the operator.
- * Possible values: INGESTION, COMPUTE, STORAGE, COMMIT
- */
+ * Possible values: COMPUTE, STORE, OTHER
+ **/
private final FeedRuntimeType runtimeType;
private final String operandId;
public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
- IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType,
- String operandId) {
+ IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
+ FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
- this.feedPolicy = feedPolicy;
+ this.feedPolicyProperties = feedPolicyProperties;
if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
}
@@ -81,8 +86,30 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
- feedConnectionId, feedPolicy, runtimeType, operandId);
+ IOperatorNodePushable nodePushable = null;
+ switch (runtimeType) {
+ case COMPUTE:
+ nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
+ coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case STORE:
+ nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
+ coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case OTHER:
+ nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+ feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case ETS:
+ nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
+ recordDescProvider, partition, nPartitions);
+ break;
+ case JOIN:
+ break;
+ default:
+ throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
+ }
+ return nodePushable;
}
@Override
@@ -90,173 +117,12 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
return "FeedMeta [" + coreOperator + " ]";
}
- private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- /** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
-
- /**
- * A policy enforcer that ensures dyanmic decisions for a feed are taken in accordance
- * with the associated ingestion policy
- **/
- private FeedPolicyEnforcer policyEnforcer;
-
- /**
- * The Feed Runtime instance associated with the operator. Feed Runtime captures the state of the operator while
- * the feed is active.
- */
- private FeedRuntime feedRuntime;
-
- /**
- * A unique identifier for the feed instance. A feed instance represents the flow of data
- * from a feed to a dataset.
- **/
- private FeedConnectionId feedId;
-
- /** Denotes the i'th operator instance in a setting where K operator instances are scheduled to run in parallel **/
- private int partition;
-
- /** A buffer that is used to hold the current frame that is being processed **/
- private ByteBuffer currentBuffer;
-
- /** Type associated with the core feed operator **/
- private final FeedRuntimeType runtimeType;
-
- /** True is the feed is recovering from a previous failed execution **/
- private boolean resumeOldState;
-
- /** The Node Controller ID for the host NC **/
-
- private String nodeId;
-
- /** Allows to iterate over the tuples in a frame **/
- private FrameTupleAccessor fta;
-
- /** The (singleton) instance of IFeedManager **/
- private IFeedManager feedManager;
-
- private final String operandId;
-
- public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- FeedPolicy feedPolicy, FeedRuntimeType runtimeType, String operationId) throws HyracksDataException {
- this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
- .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
- this.partition = partition;
- this.runtimeType = runtimeType;
- this.feedId = feedConnectionId;
- this.nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
- fta = new FrameTupleAccessor(recordDesc);
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- this.operandId = operationId;
- }
-
- @Override
- public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, runtimeType, operandId, partition);
- try {
- feedRuntime = feedManager.getFeedRuntime(runtimeId);
- if (feedRuntime == null) {
- feedRuntime = new FeedRuntime(feedId, partition, runtimeType, operandId);
- feedManager.registerFeedRuntime(feedRuntime);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
- + runtimeType + " node.");
- }
- resumeOldState = false;
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Retreived state from the zombie instance from previous execution for "
- + runtimeType + " node.");
- }
- resumeOldState = true;
- }
- FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
- runtimeType, partition, fta, feedManager);
- coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
- coreOperatorNodePushable.open();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to initialize feed operator " + feedRuntime + " [" + partition + "]");
- }
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- if (resumeOldState) {
- FeedRuntimeState runtimeState = feedRuntime.getRuntimeState();
- if (runtimeState != null) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("State from previous zombie instance " + feedRuntime.getRuntimeState());
- }
- coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
- feedRuntime.setRuntimeState(null);
- }
- resumeOldState = false;
- }
- currentBuffer = buffer;
- coreOperatorNodePushable.nextFrame(buffer);
- currentBuffer = null;
- } catch (HyracksDataException e) {
- if (policyEnforcer.getFeedPolicyAccessor().continueOnApplicationFailure()) {
- boolean isExceptionHarmful = e.getCause() instanceof TreeIndexException && !resumeOldState;
- if (isExceptionHarmful) {
- // TODO: log the tuple
- FeedRuntimeState runtimeState = new FeedRuntimeState(buffer, writer, e);
- feedRuntime.setRuntimeState(runtimeState);
- } else {
- // ignore the frame (exception is expected)
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Ignoring exception " + e);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Feed policy does not require feed to survive soft failure");
- }
- throw e;
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " fail ");
- }
- if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
- if (currentBuffer != null) {
- FeedRuntimeState runtimeState = new FeedRuntimeState(currentBuffer, writer, null);
- feedRuntime.setRuntimeState(runtimeState);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Saved feed compute runtime for revivals" + feedRuntime.getFeedRuntimeId());
- }
- } else {
- feedManager.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("No state to save, de-registered feed runtime " + feedRuntime.getFeedRuntimeId());
- }
- }
- }
- coreOperatorNodePushable.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- coreOperatorNodePushable.close();
- feedManager.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
- }
-
- }
-
public IOperatorDescriptor getCoreOperator() {
return coreOperator;
}
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
new file mode 100644
index 0000000..1bb377c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dyanmic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ private int nPartitions;
+
+ /** Type associated with the core feed operator **/
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final String operandId;
+
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+ int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getFeedManager();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = runtimeCtx.getFeedManager();
+ this.operandId = operationId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Runtime not found for " + runtimeId + " connection id " + connectionId);
+ }
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
+ nPartitions);
+ setupBasicRuntime(inputSideHandler);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.fta = new FrameTupleAccessor(recordDesc);
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ this.inputSideHandler.reset(nPartitions);
+ this.inputSideHandler.setCoreOperator(coreOperator);
+ feedRuntime.setMode(Mode.PROCESS);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Retreived state from the zombie instance from previous execution for " + runtimeType
+ + " node.");
+ }
+ }
+
+ private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, (FeedRuntime) feedRuntime);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
+ boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+ try {
+ if (!stalled) {
+ System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
+ + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
+ inputSideHandler.nextFrame(null); // signal end of data
+ while (!inputSideHandler.isFinished()) {
+ synchronized (coreOperator) {
+ coreOperator.wait();
+ }
+ }
+ System.out.println("ABOUT TO CLOSE OPERATOR " + coreOperator);
+ }
+ coreOperator.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // ignore
+ } finally {
+ if (!stalled) {
+ deregister();
+ System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ } else {
+ System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ }
+ inputSideHandler.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+ private void deregister() {
+ if (feedRuntime != null) {
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+ ((FeedRuntime) feedRuntime).getRuntimeId());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
deleted file mode 100644
index fd9716c..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.Map;
-
-public class FeedPolicyAccessor {
- public static final String FAILURE_LOG_ERROR = "failure.log.error";
- public static final String APPLICATION_FAILURE_LOG_DATA = "application.failure.log.data";
- public static final String APPLICATION_FAILURE_CONTINUE = "application.failure.continue";
- public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
- public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
- public static final String COLLECT_STATISTICS = "collect.statistics";
- public static final String COLLECT_STATISTICS_PERIOD = "collect.statistics.period";
- public static final String COLLECT_STATISTICS_PERIOD_UNIT = "collect.statistics.period.unit";
- public static final String ELASTIC = "elastic";
-
- public enum TimeUnit {
- SEC,
- MIN,
- HRS,
- DAYS
- }
-
- private Map<String, String> feedPolicy;
-
- public FeedPolicyAccessor(Map<String, String> feedPolicy) {
- this.feedPolicy = feedPolicy;
- }
-
- public boolean logErrorOnFailure() {
- return getBooleanPropertyValue(FAILURE_LOG_ERROR);
- }
-
- public boolean logDataOnApplicationFailure() {
- return getBooleanPropertyValue(APPLICATION_FAILURE_LOG_DATA);
- }
-
- public boolean continueOnApplicationFailure() {
- return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE);
- }
-
- public boolean continueOnHardwareFailure() {
- return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE);
- }
-
- public boolean autoRestartOnClusterReboot() {
- return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART);
- }
-
- public boolean collectStatistics() {
- return getBooleanPropertyValue(COLLECT_STATISTICS);
- }
-
- public long getStatisicsCollectionPeriodInSecs() {
- return getIntegerPropertyValue(COLLECT_STATISTICS_PERIOD) * getTimeUnitFactor();
- }
-
- public boolean isElastic() {
- return getBooleanPropertyValue(ELASTIC);
- }
-
- private int getTimeUnitFactor() {
- String v = feedPolicy.get(COLLECT_STATISTICS_PERIOD_UNIT);
- int factor = 1;
- switch (TimeUnit.valueOf(v)) {
- case SEC:
- factor = 1;
- break;
- case MIN:
- factor = 60;
- break;
- case HRS:
- factor = 3600;
- break;
- case DAYS:
- factor = 216000;
- break;
-
- }
- return factor;
- }
-
- private boolean getBooleanPropertyValue(String key) {
- String v = feedPolicy.get(key);
- return v == null ? false : Boolean.valueOf(v);
- }
-
- private int getIntegerPropertyValue(String key) {
- String v = feedPolicy.get(key);
- return Integer.parseInt(v);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
index 44487ec..5cd4bf1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -15,60 +15,32 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.rmi.RemoteException;
-import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
public class FeedPolicyEnforcer {
- private final FeedConnectionId feedId;
- private final FeedPolicyAccessor feedPolicyAccessor;
- private final FeedActivity feedActivity;
+ private final FeedConnectionId connectionId;
+ private final FeedPolicyAccessor policyAccessor;
- public FeedPolicyEnforcer(FeedConnectionId feedId, Map<String, String> feedPolicy) {
- this.feedId = feedId;
- this.feedPolicyAccessor = new FeedPolicyAccessor(feedPolicy);
- this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getFeedName(), feedId.getDatasetName(),
- null, new HashMap<String, String>());
+ public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
+ this.connectionId = feedConnectionId;
+ this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
}
public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
- boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
- if (feedPolicyAccessor.logErrorOnFailure()) {
- persistExceptionDetails(e);
- }
- return continueIngestion;
- }
-
- private synchronized void persistExceptionDetails(Exception e) throws RemoteException, ACIDException {
- MetadataManager.INSTANCE.acquireWriteLatch();
- MetadataTransactionContext ctx = null;
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- feedActivity.setActivityType(FeedActivityType.FEED_FAILURE);
- feedActivity.getFeedActivityDetails().put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE,
- e.getMessage());
- MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e2) {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- } finally {
- MetadataManager.INSTANCE.releaseWriteLatch();
- }
+ return policyAccessor.continueOnSoftFailure();
}
public FeedPolicyAccessor getFeedPolicyAccessor() {
- return feedPolicyAccessor;
+ return policyAccessor;
}
public FeedConnectionId getFeedId() {
- return feedId;
+ return connectionId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
deleted file mode 100644
index d3225a2..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-
-public class FeedReport implements Comparable {
-
- private FeedConnectionId feedId;
- private FeedReportMessageType reportType;
- private int partition = -1;
- private FeedRuntimeType runtimeType;
- private long value = -1;
- private String[] representation;
-
- public FeedReport() {
- }
-
- public FeedReport(String message) {
- representation = message.split("\\|");
- }
-
- public void reset(String message) {
- representation = message.split("\\|");
- reportType = null;
- feedId = null;
- runtimeType = null;
- partition = -1;
- value = -1;
- }
-
- @Override
- public String toString() {
- return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
- }
-
- public FeedConnectionId getFeedId() {
- if (feedId == null) {
- String feedIdRep = representation[1];
- String[] feedIdComp = feedIdRep.split(":");
- feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
- }
- return feedId;
- }
-
- public FeedReportMessageType getReportType() {
- if (reportType == null) {
- reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
- }
- return reportType;
- }
-
- public int getPartition() {
- if (partition < 0) {
- partition = Integer.parseInt(representation[3]);
- }
- return partition;
- }
-
- public FeedRuntimeType getRuntimeType() {
- if (runtimeType == null) {
- runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
- }
- return runtimeType;
- }
-
- public long getValue() {
- if (value < 0) {
- value = Long.parseLong(representation[4]);
- }
- return value;
- }
-
- public String[] getRepresentation() {
- return representation;
- }
-
- @Override
- public int compareTo(Object o) {
- if (!(o instanceof FeedReport)) {
- throw new IllegalArgumentException("Incorrect operand type " + o);
- }
-
- FeedReport other = (FeedReport) o;
- if (!other.getReportType().equals(getReportType())) {
- throw new IllegalArgumentException("Incorrect operand type " + o);
- }
-
- int returnValue = 0;
-
- switch (getReportType()) {
- case CONGESTION:
- returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
- break;
-
- case THROUGHPUT:
- returnValue = (int) (other.getValue() - getValue());
- break;
- }
-
- return returnValue;
- }
-
- private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
-
- private static Map<FeedRuntimeType, Integer> populateRanking() {
- Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
- ranking.put(FeedRuntimeType.INGESTION, 1);
- ranking.put(FeedRuntimeType.COMPUTE, 2);
- ranking.put(FeedRuntimeType.STORAGE, 3);
- ranking.put(FeedRuntimeType.COMMIT, 4);
- return ranking;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
new file mode 100644
index 0000000..48102ba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+
+public class FeedSubscriptionManager implements IFeedSubscriptionManager {
+
+ private static Logger LOGGER = Logger.getLogger(FeedSubscriptionManager.class.getName());
+
+ private final String nodeId;
+
+ private final Map<SubscribableFeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
+
+ public FeedSubscriptionManager(String nodeId) {
+ this.nodeId = nodeId;
+ this.subscribableRuntimes = new HashMap<SubscribableFeedRuntimeId, ISubscribableRuntime>();
+ }
+
+ @Override
+ public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
+ SubscribableFeedRuntimeId sid = (SubscribableFeedRuntimeId) subscribableRuntime.getRuntimeId();
+ if (!subscribableRuntimes.containsKey(sid)) {
+ subscribableRuntimes.put(sid, subscribableRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered feed subscribable runtime " + subscribableRuntime);
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Feed ingestion runtime " + subscribableRuntime + " already registered.");
+ }
+ }
+ }
+
+ @Override
+ public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableFeedRuntimeId) {
+ return subscribableRuntimes.get(subscribableFeedRuntimeId);
+ }
+
+ @Override
+ public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId ingestionId) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("De-registered feed subscribable runtime " + ingestionId);
+ }
+ subscribableRuntimes.remove(ingestionId);
+ }
+
+ @Override
+ public String toString() {
+ return "IngestionManager [" + nodeId + "]";
+ }
+
+}