You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:02 UTC
[05/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
new file mode 100644
index 0000000..48d1dc6
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedCollectRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+ private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
+
+ private final int partition;
+ private final FeedConnectionId connectionId;
+ private final Map<String, String> feedPolicy;
+ private final FeedPolicyAccessor policyAccessor;
+ private final IFeedManager feedManager;
+ private final ISubscribableRuntime sourceRuntime;
+ private final IHyracksTaskContext ctx;
+ private final int nPartitions;
+
+ private RecordDescriptor outputRecordDescriptor;
+ private FeedRuntimeInputHandler inputSideHandler;
+ private CollectionRuntime collectRuntime;
+
+ public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
+ FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
+ ISubscribableRuntime sourceRuntime) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.connectionId = feedConnectionId;
+ this.sourceRuntime = sourceRuntime;
+ this.feedPolicy = feedPolicy;
+ policyAccessor = new FeedPolicyAccessor(feedPolicy);
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.feedManager = runtimeCtx.getFeedManager();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ outputRecordDescriptor = recordDesc;
+ FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
+ .getFeedRuntimeType();
+ switch (sourceRuntimeType) {
+ case INTAKE:
+ handleCompleteConnection();
+ break;
+ case COMPUTE:
+ handlePartialConnection();
+ break;
+ default:
+ throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+ }
+
+ State state = collectRuntime.waitTillCollectionOver();
+ if (state.equals(State.FINISHED)) {
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+ collectRuntime.getRuntimeId());
+ writer.close();
+ inputSideHandler.close();
+ } else if (state.equals(State.HANDOVER)) {
+ inputSideHandler.setMode(Mode.STALL);
+ writer.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
+ + " and the output writer " + writer + " has been closed ");
+ }
+ }
+ } catch (InterruptedException ie) {
+ handleInterruptedException(ie);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void handleCompleteConnection() throws Exception {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
+ FeedRuntimeId.DEFAULT_OPERAND_ID);
+ collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
+ runtimeId);
+ if (collectRuntime == null) {
+ beginNewFeed(runtimeId);
+ } else {
+ reviveOldFeed();
+ }
+ }
+
+ private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
+ writer.open();
+ IFrameWriter outputSideWriter = writer;
+ if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType().equals(
+ FeedRuntimeType.COMPUTE)) {
+ outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
+ connectionId);
+ this.recordDesc = sourceRuntime.getRecordDescriptor();
+ }
+
+ FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
+ inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter, policyAccessor,
+ false, tupleAccessor, recordDesc,
+ feedManager, nPartitions);
+
+ collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
+ sourceRuntime, feedPolicy);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+ }
+
+ private void reviveOldFeed() throws HyracksDataException {
+ writer.open();
+ collectRuntime.getFrameCollector().setState(State.ACTIVE);
+ inputSideHandler = collectRuntime.getInputHandler();
+
+ IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
+ if (innerWriter instanceof CollectTransformFeedFrameWriter) {
+ ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
+ } else {
+ inputSideHandler.setCoreOperator(writer);
+ }
+
+ inputSideHandler.setMode(Mode.PROCESS);
+ }
+
+ private void handlePartialConnection() throws Exception {
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
+ FeedRuntimeId.DEFAULT_OPERAND_ID);
+ writer.open();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
+ }
+ IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
+ outputRecordDescriptor, connectionId);
+
+ inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
+ new FrameTupleAccessor(recordDesc), recordDesc, feedManager,
+ nPartitions);
+
+ collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
+ feedPolicy);
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ recordDesc = sourceRuntime.getRecordDescriptor();
+ sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+ }
+
+ private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
+ if (policyAccessor.continueOnHardwareFailure()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
+ + " until failure is resolved");
+ }
+ inputSideHandler.setMode(Mode.STALL);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
+ + " as feed is not configured to handle failures");
+ }
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+ writer.close();
+ throw new HyracksDataException(ie);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
new file mode 100644
index 0000000..eefb576
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedConnectionManager implements IFeedConnectionManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
+
+ private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
+ private final String nodeId;
+
+ public FeedConnectionManager(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
+ return feedRuntimeManagers.get(feedId);
+ }
+
+ @Override
+ public void deregisterFeed(FeedConnectionId feedId) {
+ try {
+ FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+ if (mgr != null) {
+ mgr.close();
+ feedRuntimeManagers.remove(feedId);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
+ }
+ }
+
+ }
+
+ @Override
+ public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
+ throws Exception {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ if (runtimeMgr == null) {
+ runtimeMgr = new FeedRuntimeManager(connectionId, this);
+ feedRuntimeManagers.put(connectionId, runtimeMgr);
+ }
+ runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
+ }
+
+ @Override
+ public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ if (runtimeMgr != null) {
+ runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+ }
+ }
+
+ @Override
+ public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedManager " + "[" + nodeId + "]";
+ }
+
+ @Override
+ public List<FeedRuntimeId> getRegisteredRuntimes() {
+ List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
+ for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
+ runtimes.addAll(entry.getValue().getFeedRuntimes());
+ }
+ return runtimes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
new file mode 100644
index 0000000..d131b95
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class FeedFrameTupleDecorator {
+
+ private AMutableString aString = new AMutableString("");
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AtomicInteger tupleId;
+
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+
+ private final int partition;
+ private final ArrayBackedValueStorage attrNameStorage;
+ private final ArrayBackedValueStorage attrValueStorage;
+
+ public FeedFrameTupleDecorator(int partition) {
+ this.tupleId = new AtomicInteger(0);
+ this.partition = partition;
+ this.attrNameStorage = new ArrayBackedValueStorage();
+ this.attrValueStorage = new ArrayBackedValueStorage();
+ }
+
+ public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
+ throws HyracksDataException, AsterixException {
+ attrNameStorage.reset();
+ aString.setValue(attrName);
+ stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+ attrValueStorage.reset();
+ aInt64.setValue(attrValue);
+ int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
+
+ recordBuilder.addField(attrNameStorage, attrValueStorage);
+ }
+
+ public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
+ throws HyracksDataException, AsterixException {
+ attrNameStorage.reset();
+ aString.setValue(attrName);
+ stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+ attrValueStorage.reset();
+ aInt32.setValue(attrValue);
+ int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
+
+ recordBuilder.addField(attrNameStorage, attrValueStorage);
+ }
+
+ public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
+ }
+
+ public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
+ }
+
+ public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+ }
+
+ public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
deleted file mode 100644
index 899da77..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedMessageService;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * A wrapper around the standard frame writer provided to an operator node pushable.
- * The wrapper monitors the flow of data from this operator to a downstream operator
- * over a connector. It collects statistics if required by the feed ingestion policy
- * and reports them to the Super Feed Manager chosen for the feed. In addition any
- * congestion experienced by the operator is also reported.
- */
-public class FeedFrameWriter implements IFrameWriter {
-
- private static final Logger LOGGER = Logger.getLogger(FeedFrameWriter.class.getName());
-
- /** The threshold for the time required in pushing a frame to the network. **/
- public static final long FLUSH_THRESHOLD_TIME = 5000; // 5 seconds
-
- /** Actual frame writer provided to an operator. **/
- private IFrameWriter writer;
-
- /** The node pushable associated with the operator **/
- private IOperatorNodePushable nodePushable;
-
- /** set to true if health need to be monitored **/
- private final boolean reportHealth;
-
- /** A buffer for keeping frames that are waiting to be processed **/
- private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-
- /**
- * Mode associated with the frame writer
- * Possible values: FORWARD, STORE
- *
- * @see Mode
- */
- private Mode mode;
-
- /**
- * Detects if the operator is unable to push a frame downstream
- * within a threshold period of time. In addition, it measure the
- * throughput as observed on the output channel of the associated operator.
- */
- private HealthMonitor healthMonitor;
-
- /**
- * A Timer instance for managing scheduling of tasks.
- */
- private Timer timer;
-
- /**
- * Provides access to the tuples in a frame. Used in collecting statistics
- */
- private FrameTupleAccessor fta;
-
- public enum Mode {
- /**
- * **
- * Normal mode of operation for an operator when
- * frames are pushed to the downstream operator.
- */
- FORWARD,
-
- /**
- * Failure mode of operation for an operator when
- * input frames are not pushed to the downstream operator but
- * are buffered for future retrieval. This mode is adopted
- * during failure recovery.
- */
- STORE
- }
-
- public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
- FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
- FrameTupleAccessor fta, IFeedManager feedManager) {
- this.writer = writer;
- this.mode = Mode.FORWARD;
- this.nodePushable = nodePushable;
- this.reportHealth = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
- if (reportHealth) {
- timer = new Timer();
- healthMonitor = new HealthMonitor(feedId, nodeId, feedRuntimeType, partition, timer, fta, feedManager);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Statistics collection enabled for the feed " + feedId + " " + feedRuntimeType + " ["
- + partition + "]");
- }
- timer.scheduleAtFixedRate(healthMonitor, 0, FLUSH_THRESHOLD_TIME);
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Statistics collection *not* enabled for the feed " + feedId + " " + feedRuntimeType + " ["
- + partition + "]");
- }
- }
- this.fta = fta;
- }
-
- public Mode getMode() {
- return mode;
- }
-
- public void setMode(Mode newMode) throws HyracksDataException {
- if (this.mode.equals(newMode)) {
- return;
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switching to :" + newMode + " from " + this.mode);
- }
- this.mode = newMode;
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- switch (mode) {
- case FORWARD:
- try {
- if (reportHealth) {
- fta.reset(buffer);
- healthMonitor.notifyStartFrameFlushActivity();
- writer.nextFrame(buffer);
- healthMonitor.notifyFinishFrameFlushActivity();
- } else {
- writer.nextFrame(buffer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to write frame " + " on behalf of " + nodePushable.getDisplayName()
- + ":\n" + e);
- }
- }
- if (frames.size() > 0) {
- for (ByteBuffer buf : frames) {
- writer.nextFrame(buf);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Flushed old frame (from previous failed execution) : " + buf
- + " on behalf of " + nodePushable.getDisplayName());
- }
- }
- frames.clear();
- }
- break;
- case STORE:
-
- /* TODO:
- * Limit the in-memory space utilized during the STORE mode. The limit (expressed in bytes)
- * is a parameter specified as part of the feed ingestion policy. Below is a basic implemenation
- * that allocates a buffer on demand.
- * */
-
- ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
- storageBuffer.put(buffer);
- frames.add(storageBuffer);
- storageBuffer.flip();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stored frame for " + nodePushable.getDisplayName());
- }
- break;
- }
- }
-
- /**
- * Detects if the operator is unable to push a frame downstream
- * within a threshold period of time. In addition, it measure the
- * throughput as observed on the output channel of the associated operator.
- */
- private static class HealthMonitor extends TimerTask {
-
- private static final String EOL = "\n";
-
- private long startTime = -1;
- private FramePushState state;
- private AtomicLong numTuplesInInterval = new AtomicLong(0);
- private boolean collectThroughput;
- private FeedMessageService mesgService;
-
- private final FeedConnectionId feedId;
- private final String nodeId;
- private final FeedRuntimeType feedRuntimeType;
- private final int partition;
- private final long period;
- private final FrameTupleAccessor fta;
- private final IFeedManager feedManager;
-
- public HealthMonitor(FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
- Timer timer, FrameTupleAccessor fta, IFeedManager feedManager) {
- this.state = FramePushState.INTIALIZED;
- this.feedId = feedId;
- this.nodeId = nodeId;
- this.feedRuntimeType = feedRuntimeType;
- this.partition = partition;
- this.period = FLUSH_THRESHOLD_TIME;
- this.collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
- this.fta = fta;
- this.feedManager = feedManager;
- }
-
- public void notifyStartFrameFlushActivity() {
- startTime = System.currentTimeMillis();
- state = FramePushState.WAITING_FOR_FLUSH_COMPLETION;
- }
-
- /**
- * Reset method is invoked when a live instance of operator needs to take
- * over from the zombie instance from the previously failed execution
- */
- public void reset() {
- mesgService = null;
- collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
- }
-
- public void notifyFinishFrameFlushActivity() {
- state = FramePushState.WAITNG_FOR_NEXT_FRAME;
- numTuplesInInterval.set(numTuplesInInterval.get() + fta.getTupleCount());
- }
-
- @Override
- public void run() {
- if (state.equals(FramePushState.WAITING_FOR_FLUSH_COMPLETION)) {
- long currentTime = System.currentTimeMillis();
- if (currentTime - startTime > FLUSH_THRESHOLD_TIME) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
- }
- sendReportToSuperFeedManager(currentTime - startTime, FeedReportMessageType.CONGESTION,
- System.currentTimeMillis());
- }
- }
- if (collectThroughput) {
- int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
- sendReportToSuperFeedManager(instantTput, FeedReportMessageType.THROUGHPUT, System.currentTimeMillis());
- }
- numTuplesInInterval.set(0);
- }
-
- private void sendReportToSuperFeedManager(long value, SuperFeedManager.FeedReportMessageType mesgType,
- long timestamp) {
- if (mesgService == null) {
- waitTillMessageServiceIsUp();
- }
- String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
- String message = mesgType.name().toLowerCase() + FeedMessageService.MessageSeparator + feedRep
- + FeedMessageService.MessageSeparator + feedRuntimeType + FeedMessageService.MessageSeparator
- + partition + FeedMessageService.MessageSeparator + value + FeedMessageService.MessageSeparator
- + nodeId + FeedMessageService.MessageSeparator + timestamp + FeedMessageService.MessageSeparator
- + EOL;
- try {
- mesgService.sendMessage(message);
- } catch (IOException ioe) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to send feed report to Super Feed Manager for feed " + feedId + " "
- + feedRuntimeType + "[" + partition + "]");
- }
- }
- }
-
- private void waitTillMessageServiceIsUp() {
- while (mesgService == null) {
- mesgService = feedManager.getFeedMessageService(feedId);
- if (mesgService == null) {
- try {
- /**
- * wait for the message service to be available
- */
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Encountered an interrupted exception " + " Exception " + e);
- }
- }
- }
- }
- }
-
- public void deactivate() {
- // cancel the timer task to avoid future execution.
- cancel();
- collectThroughput = false;
- }
-
- private enum FramePushState {
- /**
- * Frame writer has been initialized
- */
- INTIALIZED,
-
- /**
- * Frame writer is waiting for a pending flush to finish.
- */
- WAITING_FOR_FLUSH_COMPLETION,
-
- /**
- * Frame writer is waiting to be given the next frame.
- */
- WAITNG_FOR_NEXT_FRAME
- }
-
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- if(healthMonitor != null) {
- if (!healthMonitor.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
- healthMonitor.deactivate();
- } else {
- healthMonitor.reset();
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (healthMonitor != null) {
- healthMonitor.deactivate();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closing frame statistics collection activity" + healthMonitor);
- }
- }
- writer.close();
- }
-
- public IFrameWriter getWriter() {
- return writer;
- }
-
- public void setWriter(IFrameWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public String toString() {
- return "MaterializingFrameWriter using " + writer;
- }
-
- public List<ByteBuffer> getStoredFrames() {
- return frames;
- }
-
- public void clear() {
- frames.clear();
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- public void reset() {
- healthMonitor.reset();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 2b05989..1282f85 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -15,194 +15,117 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
/**
- * FeedIntakeOperatorDescriptor is responsible for ingesting data from an external source. This
- * operator uses a user specified for a built-in adapter for retrieving data from the external
- * data source.
+ * An operator responsible for establishing connection with external data source and parsing,
+ * translating the received content.It uses an instance of feed adaptor to perform these functions.
*/
public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
-
- /** The type associated with the ADM data output from the feed adapter */
- private final IAType outputType;
- /** unique identifier for a feed instance. */
- private final FeedConnectionId feedId;
+ private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
- /** Map representation of policy parameters */
- private final Map<String, String> feedPolicy;
+ /** The unique identifier of the feed that is being ingested. **/
+ private final FeedId feedId;
- /** The adapter factory that is used to create an instance of the feed adapter **/
- private IAdapterFactory adapterFactory;
+ private final FeedPolicyAccessor policyAccessor;
- /** The (singleton) instance of IFeedManager **/
- private IFeedManager feedManager;
+ /** The adaptor factory that is used to create an instance of the feed adaptor **/
+ private IFeedAdapterFactory adaptorFactory;
/** The library that contains the adapter in use. **/
- private String adapterLibraryName;
+ private String adaptorLibraryName;
/**
* The adapter factory class that is used to create an instance of the feed adapter.
* This value is used only in the case of external adapters.
**/
- private String adapterFactoryClassName;
+ private String adaptorFactoryClassName;
/** The configuration parameters associated with the adapter. **/
- private Map<String, String> adapterConfiguration;
+ private Map<String, String> adaptorConfiguration;
private ARecordType adapterOutputType;
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
- ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed,
+ IFeedAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactory = adapterFactory;
- this.outputType = atype;
- this.feedId = feedId;
- this.feedPolicy = feedPolicy;
+ this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.adaptorFactory = adapterFactory;
+ this.adapterOutputType = adapterOutputType;
+ this.policyAccessor = policyAccessor;
}
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
- String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
- RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, String adapterLibraryName,
+ String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactoryClassName = adapterFactoryClassName;
- this.adapterConfiguration = configuration;
- this.adapterLibraryName = adapterLibraryName;
- this.outputType = atype;
- this.feedId = feedId;
- this.feedPolicy = feedPolicy;
+ this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.adaptorFactoryClassName = adapterFactoryClassName;
+ this.adaptorLibraryName = adapterLibraryName;
+ this.adaptorConfiguration = primaryFeed.getAdaptorConfiguration();
+ this.adapterOutputType = adapterOutputType;
+ this.policyAccessor = policyAccessor;
}
+ @Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- IFeedAdapter adapter = null;
- FeedRuntimeId feedRuntimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
- this.feedManager = runtimeCtx.getFeedManager();
- IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedRuntime(feedRuntimeId);
- try {
- if (ingestionRuntime == null) {
- // create an instance of a feed adapter to ingest data.
- adapter = createAdapter(ctx, partition);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning new feed:" + feedId);
- }
- } else {
- // retrieve the instance of the feed adapter used in previous failed execution.
- adapter = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager().getFeedAdapter();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Resuming old feed:" + feedId);
- }
+ IFeedSubscriptionManager feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
+ SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+ partition);
+ IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
+ .getSubscribableRuntime(feedIngestionId);
+ if (adaptorFactory == null) {
+ try {
+ adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+ } catch (Exception exception) {
+ throw new HyracksDataException(exception);
}
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Initialization of the feed adapter failed with exception " + exception);
- }
- throw new HyracksDataException("Initialization of the feed adapter failed", exception);
- }
- return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
- }
-
- public FeedConnectionId getFeedId() {
- return feedId;
- }
-
- public Map<String, String> getFeedPolicy() {
- return feedPolicy;
- }
-
- public IAdapterFactory getAdapterFactory() {
- return adapterFactory;
- }
-
- public IAType getOutputType() {
- return outputType;
- }
- public RecordDescriptor getRecordDescriptor() {
- return recordDescriptors[0];
+ }
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
+ policyAccessor);
}
- private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- IFeedAdapter feedAdapter = null;
- if (adapterFactory != null) {
- feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ private IFeedAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+ IFeedAdapterFactory adapterFactory = null;
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+ adaptorLibraryName);
+ if (classLoader != null) {
+ adapterFactory = ((IFeedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
+ adapterFactory.configure(adaptorConfiguration, adapterOutputType);
} else {
- ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
- adapterLibraryName);
- if (classLoader != null) {
- IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
- .newInstance()));
-
- switch (adapterFactory.getAdapterType()) {
- case TYPED: {
- ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
- feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
- partition);
- }
- break;
- case GENERIC: {
- String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
- if (outputTypeName == null) {
- throw new IllegalArgumentException(
- "You must specify the datatype associated with the incoming data. Datatype is specified by the "
- + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
- }
- ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
- (ARecordType) adapterOutputType);
- ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
- }
- break;
- }
-
- feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
- } else {
- String message = "Unable to create adapter as class loader not configured for library "
- + adapterLibraryName + " in dataverse " + feedId.getDataverse();
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe(message);
- }
- throw new IllegalArgumentException(message);
-
- }
+ String message = "Unable to create adapter as class loader not configured for library "
+ + adaptorLibraryName + " in dataverse " + feedId.getDataverse();
+ LOGGER.severe(message);
+ throw new IllegalArgumentException(message);
}
- return feedAdapter;
- }
-
- public String getAdapterLibraryName() {
- return adapterLibraryName;
+ return adapterFactory;
}
- public String getAdapterFactoryClassName() {
- return adapterFactoryClassName;
+ public FeedId getFeedId() {
+ return feedId;
}
- public Map<String, String> getAdapterConfiguration() {
- return adapterConfiguration;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index a6b97eb..3256420 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -14,133 +14,196 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter.DataExchangeMode;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager.State;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.api.ISubscriberRuntime;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
+ * The runtime for @see{FeedIntakeOperationDescriptor}.
+ * Provides the core functionality to set up the artifacts for ingestion of a feed.
+ * The artifacts are lazily activated when a feed receives a subscription request.
*/
public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+ private final FeedId feedId;
private final int partition;
- private final FeedConnectionId feedId;
- private final LinkedBlockingQueue<IFeedMessage> inbox;
- private final Map<String, String> feedPolicy;
- private final FeedPolicyEnforcer policyEnforcer;
- private final String nodeId;
- private final FrameTupleAccessor fta;
+ private final IFeedSubscriptionManager feedSubscriptionManager;
private final IFeedManager feedManager;
+ private final IHyracksTaskContext ctx;
+ private final IFeedAdapterFactory adapterFactory;
+ private final FeedPolicyAccessor policyAccessor;
- private FeedRuntime ingestionRuntime;
+ private IngestionRuntime ingestionRuntime;
private IFeedAdapter adapter;
- private FeedFrameWriter feedFrameWriter;
+ private IIntakeProgressTracker tracker;
+ private DistributeFeedFrameWriter feedFrameWriter;
- public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
- Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
- this.adapter = adapter;
- this.partition = partition;
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IFeedAdapterFactory adapterFactory,
+ int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
+ this.ctx = ctx;
this.feedId = feedId;
+ this.partition = partition;
this.ingestionRuntime = ingestionRuntime;
- inbox = new LinkedBlockingQueue<IFeedMessage>();
- this.feedPolicy = feedPolicy;
- policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
- nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
- fta = new FrameTupleAccessor(recordDesc);
+ this.adapterFactory = adapterFactory;
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
+ this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
this.feedManager = runtimeCtx.getFeedManager();
+ this.policyAccessor = policyAccessor;
}
@Override
public void initialize() throws HyracksDataException {
-
- AdapterRuntimeManager adapterRuntimeMgr = null;
+ IAdapterRuntimeManager adapterRuntimeManager = null;
try {
if (ingestionRuntime == null) {
- feedFrameWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
- FeedRuntimeType.INGESTION, partition, fta, feedManager);
- adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox,
- feedManager);
-
- if (adapter.getDataExchangeMode().equals(DataExchangeMode.PULL)
- && adapter instanceof IPullBasedFeedAdapter) {
- ((IPullBasedFeedAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning new feed:" + feedId);
+ try {
+ adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+ if (adapterFactory.isRecordTrackingEnabled()) {
+ tracker = adapterFactory.createIntakeProgressTracker();
+ }
+ } catch (Exception e) {
+ LOGGER.severe("Unable to create adapter : " + adapterFactory.getName() + "[" + partition + "]"
+ + " Exception " + e);
+ throw new HyracksDataException(e);
}
+ FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+ feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition, fta,
+ feedManager);
+ adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
+ SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+ partition);
+ ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
+ adapterRuntimeManager);
+ feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
feedFrameWriter.open();
- adapterRuntimeMgr.start();
} else {
- adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Resuming old feed:" + feedId);
+ if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+ ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
+ adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
+ + ingestionRuntime);
+ LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+ + " connected to backend for feed " + feedId);
+ }
+ feedFrameWriter = (DistributeFeedFrameWriter) ingestionRuntime.getFeedFrameWriter();
+ } else {
+ String message = "Feed Ingestion Runtime for feed " + feedId
+ + " is already registered and is active!.";
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
}
- adapter = adapterRuntimeMgr.getFeedAdapter();
- writer.open();
- adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
- adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
- adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
- feedFrameWriter = adapterRuntimeMgr.getAdapterExecutor().getWriter();
}
- ingestionRuntime = adapterRuntimeMgr.getIngestionRuntime();
- synchronized (adapterRuntimeMgr) {
- while (!adapterRuntimeMgr.getState().equals(State.FINISHED_INGESTION)) {
- adapterRuntimeMgr.wait();
- }
+ waitTillIngestionIsOver(adapterRuntimeManager);
+ feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
+ .getRuntimeId());
+ if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
+ throw new HyracksDataException("Unable to ingest data");
}
- feedManager.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
- feedFrameWriter.close();
+
} catch (InterruptedException ie) {
- if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Continuing on failure as per feed policy, switching to INACTIVE INGESTION temporarily");
+ /*
+ * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
+ * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
+ * The surviving intake partitions must continue to live and receive data from the external source.
+ */
+ List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
+ FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
+ boolean needToHandleFailure = false;
+ List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
+ for (ISubscriberRuntime subscriber : subscribers) {
+ policyAccessor.reset(subscriber.getFeedPolicy());
+ if (!policyAccessor.continueOnHardwareFailure()) {
+ failingSubscribers.add(subscriber);
+ } else {
+ needToHandleFailure = true;
}
- adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
- FeedRuntimeManager runtimeMgr = feedManager.getFeedRuntimeManager(feedId);
+ }
+
+ for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
try {
- runtimeMgr.close(false);
- } catch (IOException ioe) {
+ ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
+ } catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to close Feed Runtime Manager " + ioe.getMessage());
+ LOGGER.warning("Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
}
}
- feedFrameWriter.fail();
+ }
+
+ if (needToHandleFailure) {
+ ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
+ }
} else {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Interrupted Exception, something went wrong");
+ LOGGER.info("Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
}
-
- feedManager.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
- feedFrameWriter.close();
+ feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
+ .getRuntimeId());
throw new HyracksDataException(ie);
}
} catch (Exception e) {
e.printStackTrace();
throw new HyracksDataException(e);
+ } finally {
+ if (ingestionRuntime != null
+ && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+ feedFrameWriter.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
+ + ingestionRuntime.getAdapterRuntimeManager().getState());
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
+ + " Will resume after correcting failure");
+ }
+ }
+
}
}
- public Map<String, String> getFeedPolicy() {
- return feedPolicy;
+ private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
+ }
+ synchronized (adapterRuntimeManager) {
+ while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION) || (adapterRuntimeManager
+ .getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
+ adapterRuntimeManager.wait();
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+ + " done with ingestion of feed " + feedId);
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
new file mode 100644
index 0000000..098d713
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+
+public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
+
+ private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
+
+ public FeedLifecycleEventSubscriber() {
+ this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
+ }
+
+ @Override
+ public void handleFeedEvent(FeedLifecycleEvent event) {
+ inbox.add(event);
+ }
+
+ @Override
+ public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
+ boolean eventOccurred = false;
+ FeedLifecycleEvent e = null;
+ Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
+ while (eventsSoFar.hasNext()) {
+ e = eventsSoFar.next();
+ assertNoFailure(e);
+ eventOccurred = e.equals(event);
+ }
+
+ while (!eventOccurred) {
+ e = inbox.take();
+ eventOccurred = e.equals(event);
+ if (!eventOccurred) {
+ assertNoFailure(e);
+ }
+ }
+ }
+
+ private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
+ if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
+ throw new AsterixException("Failure in feed");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
deleted file mode 100644
index 8b92994..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedMessageService;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedManager implements IFeedManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
-
- private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
- private final String nodeId;
-
- public FeedManager(String nodeId) {
- this.nodeId = nodeId;
- }
-
- public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
- return feedRuntimeManagers.get(feedId);
- }
-
- public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
- FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
- return mgr == null ? null : mgr.getExecutorService();
- }
-
- @Override
- public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
- FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
- return mgr == null ? null : mgr.getMessageService();
- }
-
- @Override
- public void deregisterFeed(FeedConnectionId feedId) {
- try {
- FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
- if (mgr == null) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unknown feed id: " + feedId);
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closing feed runtime manager: " + mgr);
- }
- mgr.close(true);
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
- }
- e.printStackTrace();
- }
-
- feedRuntimeManagers.remove(feedId);
- }
-
- @Override
- public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
- FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
- if (runtimeMgr == null) {
- synchronized (feedRuntimeManagers) {
- if (runtimeMgr == null) {
- runtimeMgr = new FeedRuntimeManager(feedId, this);
- feedRuntimeManagers.put(feedId, runtimeMgr);
- }
- }
- }
-
- runtimeMgr.registerFeedRuntime(feedRuntime.getFeedRuntimeId(), feedRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered runtime " + feedRuntime + " for feed " + feedId);
- }
- }
-
- @Override
- public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
- if (runtimeMgr != null) {
- runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Deregistered Feed Runtime " + feedRuntimeId);
- }
- }
- }
-
- @Override
- public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
- return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
- }
-
- @Override
- public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
- if (runtimeMgr != null) {
- runtimeMgr.setSuperFeedManager(sfm);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered Super Feed Manager " + sfm);
- }
- }
- }
-
- @Override
- public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
- return runtimeMgr != null ? runtimeMgr.getSuperFeedManager() : null;
- }
-
- @Override
- public String toString() {
- return "FeedManager " + "[" + nodeId + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index 9b00322..ca50c83 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.metadata.feeds;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -29,20 +30,20 @@ public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperato
private static final long serialVersionUID = 1L;
- private final FeedConnectionId feedId;
+ private final FeedConnectionId connectionId;
private final IFeedMessage feedMessage;
- public FeedMessageOperatorDescriptor(JobSpecification spec, String dataverse, String feedName, String dataset,
+ public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
IFeedMessage feedMessage) {
super(spec, 0, 1);
- this.feedId = new FeedConnectionId(dataverse, feedName, dataset);
+ this.connectionId = connectionId;
this.feedMessage = feedMessage;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
+ return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
}
}