You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:04 UTC
[10/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
new file mode 100644
index 0000000..3cb5d64
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedRuntimeManager;
+import org.apache.asterix.external.feed.message.EndFeedMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.PrepareStallMessage;
+import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import org.apache.asterix.external.feed.watch.StorageSideMonitoredBuffer;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ * a feed message to the local feed manager on the host node controller.
+ * @see FeedMessageOperatorDescriptor
+ * IFeedMessage
+ * IFeedManager
+ */
+public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
+
+ private final FeedConnectionId connectionId;
+ private final IFeedMessage message;
+ private final IFeedManager feedManager;
+ private final int partition;
+
+ public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
+ IFeedMessage feedMessage, int partition, int nPartitions) {
+ this.connectionId = connectionId;
+ this.message = feedMessage;
+ this.partition = partition;
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ switch (message.getMessageType()) {
+ case END:
+ EndFeedMessage endFeedMessage = (EndFeedMessage) message;
+ switch (endFeedMessage.getEndMessageType()) {
+ case DISCONNECT_FEED:
+ hanldeDisconnectFeedTypeMessage(endFeedMessage);
+ break;
+ case DISCONTINUE_SOURCE:
+ handleDiscontinueFeedTypeMessage(endFeedMessage);
+ break;
+ }
+ break;
+ case PREPARE_STALL: {
+ handlePrepareStallMessage((PrepareStallMessage) message);
+ break;
+ }
+ case TERMINATE_FLOW: {
+ FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
+ handleTerminateFlowMessage(connectionId);
+ break;
+ }
+ case COMMIT_ACK_RESPONSE: {
+ handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
+ break;
+ }
+ case THROTTLING_ENABLED: {
+ handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
+ break;
+ }
+ default:
+ break;
+
+ }
+
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
+ FeedConnectionId connectionId = throttlingMessage.getConnectionId();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : runtimes) {
+ if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
+ FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
+ ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
+ + connectionId);
+ }
+ }
+ }
+ }
+
+ private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
+ FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : runtimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COLLECT:
+ FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
+ .getInputHandler();
+ int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
+ inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
+ break;
+ case STORE:
+ MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
+ .getStorageTimeTrackingRateTask();
+ sTask.receiveCommitAckResponse(commitResponseMessage);
+ break;
+ default:
+ break;
+ }
+ }
+
+ commitResponseMessage.getIntakePartition();
+ SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
+ partition);
+ IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(sid);
+ if (ingestionRuntime != null) {
+ IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
+ if (tracker != null) {
+ tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
+ }
+ }
+ }
+
+ private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+
+ boolean found = false;
+ for (FeedRuntimeId runtimeId : feedRuntimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
+ ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
+ found = true;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switched " + runtime + " to Hand Over stage");
+ }
+ }
+ }
+ if (!found) {
+ throw new HyracksDataException("COLLECT Runtime not found!");
+ }
+ }
+
+ private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
+ FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
+ int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
+ FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+ Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+ for (FeedRuntimeId runtimeId : feedRuntimes) {
+ FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COMPUTE:
+ Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
+ : Mode.END;
+ runtime.setMode(requiredMode);
+ break;
+ default:
+ runtime.setMode(Mode.STALL);
+ break;
+ }
+ }
+ }
+
+ private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+ FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
+ SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+ FeedRuntimeType.INTAKE, partition);
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(subscribableRuntimeId);
+ IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
+ adapterRuntimeManager.stop();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
+ }
+ }
+
+ private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
+ }
+ FeedRuntimeId runtimeId = null;
+ FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
+ if (endFeedMessage.isCompleteDisconnection()) {
+ // subscribableRuntimeType represents the location at which the feed connection receives data
+ FeedRuntimeType runtimeType = null;
+ switch (subscribableRuntimeType) {
+ case INTAKE:
+ runtimeType = FeedRuntimeType.COLLECT;
+ break;
+ case COMPUTE:
+ runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
+ break;
+ default:
+ throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
+ }
+
+ runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+ CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
+ .getFeedRuntime(connectionId, runtimeId);
+ feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
+ }
+ } else {
+ // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+ switch (subscribableRuntimeType) {
+ case INTAKE:
+ // illegal state as data hand-off from one feed to another does not happen at intake
+ throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType);
+ case COMPUTE:
+ // feed could be primary or secondary, doesn't matter
+ SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
+ connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
+ ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+ .getSubscribableRuntime(feedSubscribableRuntimeId);
+ DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
+ Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
+
+ IFrameWriter unsubscribingWriter = null;
+ for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
+ IFrameWriter frameWriter = entry.getKey();
+ FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
+ if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
+ unsubscribingWriter = feedFrameWriter;
+ dWriter.unsubscribeFeed(unsubscribingWriter);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
+ }
+ break;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unsubscribed from feed :" + connectionId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
new file mode 100644
index 0000000..80a54be
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+/*
+ * This IFrameWriter doesn't follow the contract
+ */
+public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dynamic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ private int nPartitions;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
+
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+ int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getFeedManager();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+ writer.open();
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
+
+ DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+ writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+ coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+
+ feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
+ recordDesc);
+ feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+
+ distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.inputSideHandler.setCoreOperator(coreOperator);
+
+ DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+ writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+ coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+ distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+
+ inputSideHandler.reset(nPartitions);
+ feedRuntime.setMode(Mode.PROCESS);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+ boolean end = inputSideHandler.getMode().equals(Mode.END);
+ try {
+ if (inputSideHandler != null) {
+ if (!(stalled || end)) {
+ inputSideHandler.nextFrame(null); // signal end of data
+ while (!inputSideHandler.isFinished()) {
+ synchronized (coreOperator) {
+ coreOperator.wait();
+ }
+ }
+ } else {
+ inputSideHandler.setFinished(true);
+ }
+ }
+ coreOperator.close();
+ System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (!stalled) {
+ deregister();
+ System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ } else {
+ System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ }
+ if (inputSideHandler != null) {
+ inputSideHandler.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+ private void deregister() {
+ if (feedRuntime != null) {
+ // deregister from subscription manager
+ SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
+ feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
+
+ // deregister from connection manager
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
new file mode 100644
index 0000000..4dae72d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dyanmic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ /** Total number of partitions available **/
+ private int nPartitions;
+
+ /** Type associated with the core feed operator **/
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final String operandId;
+
+ /** The pre-processor associated with this runtime **/
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getFeedManager();
+ this.operandId = operationId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager, nPartitions);
+
+ setupBasicRuntime(inputSideHandler);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.fta = new FrameTupleAccessor(recordDesc);
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ feedRuntime.setMode(Mode.PROCESS);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
+ }
+ }
+
+ private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ coreOperator.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // ignore
+ } finally {
+ if (inputSideHandler != null) {
+ inputSideHandler.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
new file mode 100644
index 0000000..9eb6c78
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
+ * environment for an hyracks operator that is part of a feed ingestion
+ * pipeline. The MetaFeed operator provides an interface iden- tical to that
+ * offered by the underlying wrapped operator, hereafter referred to as the core
+ * operator. As seen by Hyracks, the altered pipeline is identical to the
+ * earlier version formed from core operators. The MetaFeed operator enhances
+ * each core operator by providing functionality for handling runtime
+ * exceptions, saving any state for future retrieval, and measuring/reporting of
+ * performance characteristics. We next describe how the added functionality
+ * contributes to providing fault- tolerance.
+ */
+
+public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The actual (Hyracks) operator that is wrapped around by the MetaFeed
+ * operator.
+ **/
+ private IOperatorDescriptor coreOperator;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents the
+ * flow of data from a feed to a dataset.
+ **/
+ private final FeedConnectionId feedConnectionId;
+
+ /**
+ * The policy associated with the feed instance.
+ **/
+ private final Map<String, String> feedPolicyProperties;
+
+ /**
+ * type for the feed runtime associated with the operator.
+ * Possible values: COMPUTE, STORE, OTHER
+ **/
+ private final FeedRuntimeType runtimeType;
+
+ private final String operandId;
+
+ public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+ IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
+ FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
+ super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
+ this.feedConnectionId = feedConnectionId;
+ this.feedPolicyProperties = feedPolicyProperties;
+ if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
+ recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
+ }
+ this.coreOperator = coreOperatorDescriptor;
+ this.runtimeType = runtimeType;
+ this.operandId = operandId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ IOperatorNodePushable nodePushable = null;
+ switch (runtimeType) {
+ case COMPUTE:
+ nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
+ coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case STORE:
+ nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
+ coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case OTHER:
+ nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+ feedConnectionId, feedPolicyProperties, operandId);
+ break;
+ case ETS:
+ nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
+ recordDescProvider, partition, nPartitions);
+ break;
+ case JOIN:
+ break;
+ default:
+ throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
+ }
+ return nodePushable;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedMeta [" + coreOperator + " ]";
+ }
+
+ public IOperatorDescriptor getCoreOperator() {
+ return coreOperator;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
new file mode 100644
index 0000000..f75b3eb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
+
+ /** Runtime node pushable corresponding to the core feed operator **/
+ private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+ /**
+ * A policy enforcer that ensures dyanmic decisions for a feed are taken
+ * in accordance with the associated ingestion policy
+ **/
+ private FeedPolicyEnforcer policyEnforcer;
+
+ /**
+ * The Feed Runtime instance associated with the operator. Feed Runtime
+ * captures the state of the operator while the feed is active.
+ */
+ private FeedRuntime feedRuntime;
+
+ /**
+ * A unique identifier for the feed instance. A feed instance represents
+ * the flow of data from a feed to a dataset.
+ **/
+ private FeedConnectionId connectionId;
+
+ /**
+ * Denotes the i'th operator instance in a setting where K operator
+ * instances are scheduled to run in parallel
+ **/
+ private int partition;
+
+ private int nPartitions;
+
+ /** Type associated with the core feed operator **/
+ private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
+
+ /** The (singleton) instance of IFeedManager **/
+ private IFeedManager feedManager;
+
+ private FrameTupleAccessor fta;
+
+ private final IHyracksTaskContext ctx;
+
+ private final String operandId;
+
+ private FeedRuntimeInputHandler inputSideHandler;
+
+ public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+ int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+ this.ctx = ctx;
+ this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getFeedManager();
+ this.operandId = operationId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ try {
+ feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ if (feedRuntime == null) {
+ initializeNewFeedRuntime(runtimeId);
+ } else {
+ reviveOldFeedRuntime(runtimeId);
+ }
+
+ coreOperator.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Runtime not found for " + runtimeId + " connection id " + connectionId);
+ }
+ this.fta = new FrameTupleAccessor(recordDesc);
+ this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+ policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
+ recordDesc, feedManager, nPartitions);
+ if (coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
+ AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
+ if (!indexOp.isPrimary()) {
+ inputSideHandler.setBufferingEnabled(false);
+ }
+ }
+ setupBasicRuntime(inputSideHandler);
+ }
+
+ private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ this.inputSideHandler = feedRuntime.getInputHandler();
+ this.fta = new FrameTupleAccessor(recordDesc);
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ this.inputSideHandler.reset(nPartitions);
+ this.inputSideHandler.setCoreOperator(coreOperator);
+ feedRuntime.setMode(Mode.PROCESS);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(
+ "Retreived state from the zombie instance from previous execution for " + runtimeType + " node.");
+ }
+ }
+
+ private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+ coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+ feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ inputSideHandler.nextFrame(buffer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+ }
+ feedRuntime.setMode(Mode.FAIL);
+ coreOperator.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
+ boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+ try {
+ if (!stalled) {
+ System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
+ + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
+ inputSideHandler.nextFrame(null); // signal end of data
+ while (!inputSideHandler.isFinished()) {
+ synchronized (coreOperator) {
+ coreOperator.wait();
+ }
+ }
+ System.out.println("ABOUT TO CLOSE OPERATOR " + coreOperator);
+ }
+ coreOperator.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // ignore
+ } finally {
+ if (!stalled) {
+ deregister();
+ System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ } else {
+ System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+ }
+ inputSideHandler.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Operator " + this.feedRuntime.getRuntimeId());
+ }
+ }
+ }
+
+ private void deregister() {
+ if (feedRuntime != null) {
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 860d35f..129b62f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.parser;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -1145,4 +1146,10 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
recordBuilderPool.reset();
abvsBuilderPool.reset();
}
+
+ @Override
+ public boolean reset(InputStream in) throws IOException {
+ admLexer.reInit(new InputStreamReader(in));
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 146064a..6c399c3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -198,11 +198,17 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
}
@Override
- public void setInputStream(InputStream in) throws Exception {
+ public void setInputStream(InputStream in) throws IOException {
cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
if (in != null && hasHeader) {
cursor.nextRecord();
while (cursor.nextField());
}
}
+
+ @Override
+ public boolean reset(InputStream in) throws IOException {
+ cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 649ca43..c5b39df 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -30,10 +30,6 @@ import org.apache.asterix.external.api.IIndexingAdapterFactory;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.library.ExternalLibraryManager;
-import org.apache.asterix.external.runtime.GenericSocketFeedAdapter;
-import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
-import org.apache.asterix.external.runtime.SocketClientAdapter;
-import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
@@ -47,16 +43,16 @@ public class AdapterFactoryProvider {
Map<String, Class<? extends IAdapterFactory>> adapterFactories = new HashMap<String, Class<? extends IAdapterFactory>>();
// Class names
adapterFactories.put(GenericAdapter.class.getName(), GenericAdapterFactory.class);
- adapterFactories.put(GenericSocketFeedAdapter.class.getName(), GenericSocketFeedAdapterFactory.class);
- adapterFactories.put(SocketClientAdapter.class.getName(), SocketClientAdapterFactory.class);
-
// Aliases
adapterFactories.put(ExternalDataConstants.ALIAS_GENERIC_ADAPTER, GenericAdapterFactory.class);
adapterFactories.put(ExternalDataConstants.ALIAS_HDFS_ADAPTER, GenericAdapterFactory.class);
adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, GenericAdapterFactory.class);
- adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericSocketFeedAdapterFactory.class);
- adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, SocketClientAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, GenericAdapterFactory.class);
adapterFactories.put(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PUSH_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER, GenericAdapterFactory.class);
// Compatability
adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 68a3942..dfe7aed 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -28,14 +28,19 @@ import org.apache.asterix.external.api.IInputStreamProvider;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordFlowController;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
+import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
import org.apache.asterix.external.dataflow.IndexingDataFlowController;
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -60,9 +65,11 @@ public class DataflowControllerProvider {
Map<String, String> configuration, boolean indexingOp) throws Exception {
switch (dataSourceFactory.getDataSourceType()) {
case RECORDS:
- RecordDataFlowController recordDataFlowController;
+ IRecordFlowController recordDataFlowController = null;
if (indexingOp) {
recordDataFlowController = new IndexingDataFlowController();
+ } else if (ExternalDataUtils.isFeed(configuration)) {
+ recordDataFlowController = new FeedRecordDataFlowController();
} else {
recordDataFlowController = new RecordDataFlowController();
}
@@ -77,7 +84,12 @@ public class DataflowControllerProvider {
recordDataFlowController.setRecordParser(dataParser);
return recordDataFlowController;
case STREAM:
- StreamDataFlowController streamDataFlowController = new StreamDataFlowController();
+ IStreamFlowController streamDataFlowController = null;
+ if (ExternalDataUtils.isFeed(configuration)) {
+ streamDataFlowController = new FeedStreamDataFlowController();
+ } else {
+ streamDataFlowController = new StreamDataFlowController();
+ }
streamDataFlowController.configure(configuration, ctx);
streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index c69e12c..a7ab062 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -27,6 +27,7 @@ import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -92,8 +93,12 @@ public class DatasourceFactoryProvider {
.setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));;
break;
+ case ExternalDataConstants.READER_TWITTER_PULL:
+ case ExternalDataConstants.READER_TWITTER_PUSH:
+ readerFactory = new TwitterRecordReaderFactory();
+ break;
default:
- throw new AsterixException("unknown input stream factory");
+ throw new AsterixException("unknown record reader factory");
}
}
return readerFactory;