You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:07 UTC
[13/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
new file mode 100644
index 0000000..0749f82
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class RemoteSocketMessageListener {
+
+ private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
+
+ private final String host;
+ private final int port;
+ private final LinkedBlockingQueue<String> outbox;
+ private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ private RemoteMessageListenerServer listenerServer;
+
+ public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
+ this.host = host;
+ this.port = port;
+ this.outbox = outbox;
+ }
+
+ public void stop() {
+ if (!executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ listenerServer.stop();
+
+ }
+
+ public void start() throws IOException {
+ listenerServer = new RemoteMessageListenerServer(host, port, outbox);
+ executorService.execute(listenerServer);
+ }
+
+ private static class RemoteMessageListenerServer implements Runnable {
+
+ private final String host;
+ private final int port;
+ private final LinkedBlockingQueue<String> outbox;
+ private Socket client;
+
+ public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
+ this.host = host;
+ this.port = port;
+ this.outbox = outbox;
+ }
+
+ public void stop() {
+ try {
+ client.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void run() {
+ char EOL = (char) "\n".getBytes()[0];
+ Socket client = null;
+ try {
+ client = new Socket(host, port);
+ InputStream in = client.getInputStream();
+ CharBuffer buffer = CharBuffer.allocate(5000);
+ char ch;
+ while (true) {
+ ch = (char) in.read();
+ if ((ch) == -1) {
+ break;
+ }
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ synchronized (outbox) {
+ outbox.add(s + "\n");
+ }
+ buffer.position(0);
+ buffer.limit(5000);
+ }
+
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start Remote Message listener" + client);
+ }
+ } finally {
+ if (client != null && !client.isClosed()) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ public static interface IMessageAnalyzer {
+
+ /**
+ * @return
+ */
+ public LinkedBlockingQueue<String> getMessageQueue();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
new file mode 100644
index 0000000..62aba04
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to scale in a stage of the feed ingestion pipeline.
+ * Currently, scaling-in of the compute stage is supported.
+ **/
+public class ScaleInReportMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ private final FeedRuntimeType runtimeType;
+
+ private int currentCardinality;
+
+ private int reducedCardinaliy;
+
+ public ScaleInReportMessage(FeedConnectionId connectionId, FeedRuntimeType runtimeType, int currentCardinality,
+ int reducedCardinaliy) {
+ super(MessageType.SCALE_IN_REQUEST);
+ this.connectionId = connectionId;
+ this.runtimeType = runtimeType;
+ this.currentCardinality = currentCardinality;
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.SCALE_IN_REQUEST.name() + " " + connectionId + " [" + runtimeType + "] "
+ + " currentCardinality " + currentCardinality + " reducedCardinality " + reducedCardinaliy;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeType);
+ obj.put(FeedConstants.MessageConstants.CURRENT_CARDINALITY, currentCardinality);
+ obj.put(FeedConstants.MessageConstants.REDUCED_CARDINALITY, reducedCardinaliy);
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public static ScaleInReportMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeType runtimeType = FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE));
+ return new ScaleInReportMessage(connectionId, runtimeType,
+ obj.getInt(FeedConstants.MessageConstants.CURRENT_CARDINALITY),
+ obj.getInt(FeedConstants.MessageConstants.REDUCED_CARDINALITY));
+ }
+
+ public void reset(int currentCardinality, int reducedCardinaliy) {
+ this.currentCardinality = currentCardinality;
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+ public int getCurrentCardinality() {
+ return currentCardinality;
+ }
+
+ public void setCurrentCardinality(int currentCardinality) {
+ this.currentCardinality = currentCardinality;
+ }
+
+ public int getReducedCardinaliy() {
+ return reducedCardinaliy;
+ }
+
+ public void setReducedCardinaliy(int reducedCardinaliy) {
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
new file mode 100644
index 0000000..a1a0ad5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IMessageReceiver;
+
+/**
+ * Listens for messages at a configured port and redirects them to a
+ * an instance of {@code IMessageReceiver}.
+ * Messages may arrive in parallel from multiple senders. Each sender is handled by
+ * a respective instance of {@code ClientHandler}.
+ */
+public class SocketMessageListener {
+
+ private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
+
+ private final IMessageReceiver<String> messageReceiver;
+ private final MessageListenerServer listenerServer;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
+ this.messageReceiver = messageReceiver;
+ this.listenerServer = new MessageListenerServer(port, messageReceiver);
+ }
+
+ public void stop() throws IOException {
+ listenerServer.stop();
+ messageReceiver.close(false);
+ if (!executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ }
+
+ public void start() {
+ messageReceiver.start();
+ executorService.execute(listenerServer);
+ }
+
+ private static class MessageListenerServer implements Runnable {
+
+ private final int port;
+ private final IMessageReceiver<String> messageReceiver;
+ private ServerSocket server;
+ private final Executor executor;
+
+ public MessageListenerServer(int port, IMessageReceiver<String> messageReceiver) {
+ this.port = port;
+ this.messageReceiver = messageReceiver;
+ this.executor = Executors.newCachedThreadPool();
+ }
+
+ public void stop() throws IOException {
+ server.close();
+ }
+
+ @Override
+ public void run() {
+ Socket client = null;
+ try {
+ server = new ServerSocket(port);
+ while (true) {
+ client = server.accept();
+ ClientHandler handler = new ClientHandler(client, messageReceiver);
+ executor.execute(handler);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start Message listener" + server);
+ }
+ } finally {
+ if (server != null) {
+ try {
+ server.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private static class ClientHandler implements Runnable {
+
+ private static final char EOL = (char) "\n".getBytes()[0];
+
+ private final Socket client;
+ private final IMessageReceiver<String> messageReceiver;
+
+ public ClientHandler(Socket client, IMessageReceiver<String> messageReceiver) {
+ this.client = client;
+ this.messageReceiver = messageReceiver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ InputStream in = client.getInputStream();
+ CharBuffer buffer = CharBuffer.allocate(5000);
+ char ch;
+ while (true) {
+ ch = (char) in.read();
+ if ((ch) == -1) {
+ break;
+ }
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array(), 0, buffer.limit());
+ messageReceiver.sendMessage(s + "\n");
+ buffer.position(0);
+ buffer.limit(5000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to process mesages from client" + client);
+ }
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
new file mode 100644
index 0000000..68ce74d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedConstants.MessageConstants;
+
+/**
+ * A feed control message sent from a storage runtime of a feed pipeline to report the intake timestamp corresponding
+ * to the last persisted tuple.
+ */
+public class StorageReportFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final int partition;
+ private long lastPersistedTupleIntakeTimestamp;
+ private boolean persistenceDelayWithinLimit;
+ private long averageDelay;
+ private int intakePartition;
+
+ public StorageReportFeedMessage(FeedConnectionId connectionId, int partition,
+ long lastPersistedTupleIntakeTimestamp, boolean persistenceDelayWithinLimit, long averageDelay,
+ int intakePartition) {
+ super(MessageType.STORAGE_REPORT);
+ this.connectionId = connectionId;
+ this.partition = partition;
+ this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+ this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+ this.averageDelay = averageDelay;
+ this.intakePartition = intakePartition;
+ }
+
+ @Override
+ public String toString() {
+ return messageType.name() + " " + connectionId + " [" + lastPersistedTupleIntakeTimestamp + "] ";
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public long getLastPersistedTupleIntakeTimestamp() {
+ return lastPersistedTupleIntakeTimestamp;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public boolean isPersistenceDelayWithinLimit() {
+ return persistenceDelayWithinLimit;
+ }
+
+ public void setPersistenceDelayWithinLimit(boolean persistenceDelayWithinLimit) {
+ this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+ }
+
+ public long getAverageDelay() {
+ return averageDelay;
+ }
+
+ public void setAverageDelay(long averageDelay) {
+ this.averageDelay = averageDelay;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP, lastPersistedTupleIntakeTimestamp);
+ obj.put(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT, persistenceDelayWithinLimit);
+ obj.put(MessageConstants.AVERAGE_PERSISTENCE_DELAY, averageDelay);
+ obj.put(FeedConstants.MessageConstants.PARTITION, partition);
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+
+ return obj;
+ }
+
+ public static StorageReportFeedMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int partition = obj.getInt(FeedConstants.MessageConstants.PARTITION);
+ long timestamp = obj.getLong(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP);
+ boolean persistenceDelayWithinLimit = obj.getBoolean(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT);
+ long averageDelay = obj.getLong(MessageConstants.AVERAGE_PERSISTENCE_DELAY);
+ int intakePartition = obj.getInt(MessageConstants.INTAKE_PARTITION);
+ return new StorageReportFeedMessage(connectionId, partition, timestamp, persistenceDelayWithinLimit,
+ averageDelay, intakePartition);
+ }
+
+ public void reset(long lastPersistedTupleIntakeTimestamp, boolean delayWithinLimit, long averageDelay) {
+ this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+ this.persistenceDelayWithinLimit = delayWithinLimit;
+ this.averageDelay = averageDelay;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
new file mode 100644
index 0000000..ab77840
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedConstants;
+
+public class TerminateDataFlowMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ public TerminateDataFlowMessage(FeedConnectionId connectionId) {
+ super(MessageType.TERMINATE_FLOW);
+ this.connectionId = connectionId;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
new file mode 100644
index 0000000..0459310
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class ThrottlingEnabledFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ private final FeedRuntimeId runtimeId;
+
+ public ThrottlingEnabledFeedMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId) {
+ super(MessageType.THROTTLING_ENABLED);
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.END.name() + " " + connectionId + " [" + runtimeId + "] ";
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+ obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+ obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public FeedRuntimeId getFeedRuntimeId() {
+ return runtimeId;
+ }
+
+ public static ThrottlingEnabledFeedMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+ obj.getInt(FeedConstants.MessageConstants.PARTITION),
+ obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+ return new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
new file mode 100644
index 0000000..cef3fa9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to execute a give AQL.
+ */
+public class XAQLFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String aql;
+ private final FeedConnectionId connectionId;
+
+ public XAQLFeedMessage(FeedConnectionId connectionId, String aql) {
+ super(MessageType.XAQL);
+ this.connectionId = connectionId;
+ this.aql = aql;
+ }
+
+ @Override
+ public String toString() {
+ return messageType.name() + " " + connectionId + " [" + aql + "] ";
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public String getAql() {
+ return aql;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.AQL, aql);
+ return obj;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
new file mode 100644
index 0000000..da5907c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class FeedPolicy implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String dataverseName;
+ // Enforced to be unique within a dataverse.
+ private final String policyName;
+ // A description of the policy
+ private final String description;
+ // The policy properties associated with the feed dataset
+ private Map<String, String> properties;
+
+ public FeedPolicy(String dataverseName, String policyName, String description, Map<String, String> properties) {
+ this.dataverseName = dataverseName;
+ this.policyName = policyName;
+ this.description = description;
+ this.properties = properties;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getPolicyName() {
+ return policyName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FeedPolicy)) {
+ return false;
+ }
+ FeedPolicy otherPolicy = (FeedPolicy) other;
+ if (!otherPolicy.dataverseName.equals(dataverseName)) {
+ return false;
+ }
+ if (!otherPolicy.policyName.equals(policyName)) {
+ return false;
+ }
+ return true;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
new file mode 100644
index 0000000..077a58d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A utility class to access the configuration parameters of a feed ingestion policy.
+ */
+public class FeedPolicyAccessor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * --------------------------
+ * failure configuration
+ * --------------------------
+ **/
+
+ /** continue feed ingestion after a soft (runtime) failure **/
+ public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
+
+ /** log failed tuple to an asterixdb dataset for future reference **/
+ public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
+
+ /** continue feed ingestion after loss of one or more machines (hardware failure) **/
+ public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+
+ /** auto-start a loser feed when the asterixdb instance is restarted **/
+ public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+
+ /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
+ public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
+
+ /**
+ * --------------------------
+ * flow control configuration
+ * --------------------------
+ **/
+
+ /** enable buffering in feeds **/
+ public static final String BUFFERING_ENABLED = "buffering.enabled";
+
+ /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
+ public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
+
+ /** the maximum size of data (tuples) that can be spilled to disk **/
+ public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
+
+ /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
+ public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
+
+ /** maximum fraction of ingested data that can be discarded **/
+ public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
+
+ /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
+ public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
+
+ /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
+ public static final String THROTTLING_ENABLED = "throttling.enabled";
+
+ /** elasticity **/
+ public static final String ELASTIC = "elastic";
+
+ /** statistics **/
+ public static final String TIME_TRACKING = "time.tracking";
+
+ /** logging of statistics **/
+ public static final String LOGGING_STATISTICS = "logging.statistics";
+
+ public static final long NO_LIMIT = -1;
+
+ private Map<String, String> feedPolicy;
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+ this.feedPolicy = feedPolicy;
+ }
+
+ public void reset(Map<String, String> feedPolicy) {
+ this.feedPolicy = feedPolicy;
+ }
+
+ /** Failure recover/reporting **/
+
+ public boolean logDataOnSoftFailure() {
+ return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
+ }
+
+ public boolean continueOnSoftFailure() {
+ return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
+ }
+
+ public boolean continueOnHardwareFailure() {
+ return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
+ }
+
+ public boolean autoRestartOnClusterReboot() {
+ return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
+ }
+
+ public boolean atleastOnceSemantics() {
+ return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
+ }
+
+ /** flow control **/
+ public boolean bufferingEnabled() {
+ return getBooleanPropertyValue(BUFFERING_ENABLED, false);
+ }
+
+ public boolean spillToDiskOnCongestion() {
+ return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
+ }
+
+ public boolean discardOnCongestion() {
+ return getMaxFractionDiscard() > 0;
+ }
+
+ public boolean throttlingEnabled() {
+ return getBooleanPropertyValue(THROTTLING_ENABLED, false);
+ }
+
+ public long getMaxSpillOnDisk() {
+ return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
+ }
+
+ public float getMaxFractionDiscard() {
+ return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
+ }
+
+ public long getMaxDelayRecordPersistence() {
+ return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
+ }
+
+ /** Elasticity **/
+ public boolean isElastic() {
+ return getBooleanPropertyValue(ELASTIC, false);
+ }
+
+ /** Statistics **/
+ public boolean isTimeTrackingEnabled() {
+ return getBooleanPropertyValue(TIME_TRACKING, false);
+ }
+
+ /** Logging of statistics **/
+ public boolean isLoggingStatisticsEnabled() {
+ return getBooleanPropertyValue(LOGGING_STATISTICS, false);
+ }
+
+ private boolean getBooleanPropertyValue(String key, boolean defValue) {
+ String v = feedPolicy.get(key);
+ return v == null ? false : Boolean.valueOf(v);
+ }
+
+ private long getLongPropertyValue(String key, long defValue) {
+ String v = feedPolicy.get(key);
+ return v != null ? Long.parseLong(v) : defValue;
+ }
+
+ private float getFloatPropertyValue(String key, float defValue) {
+ String v = feedPolicy.get(key);
+ return v != null ? Float.parseFloat(v) : defValue;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
new file mode 100644
index 0000000..e0944ad
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.rmi.RemoteException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+
+public class FeedPolicyEnforcer {
+
+ private final FeedConnectionId connectionId;
+ private final FeedPolicyAccessor policyAccessor;
+
+ public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
+ this.connectionId = feedConnectionId;
+ this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
+ }
+
+ public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
+ return policyAccessor.continueOnSoftFailure();
+ }
+
+ public FeedPolicyAccessor getFeedPolicyAccessor() {
+ return policyAccessor;
+ }
+
+ public FeedConnectionId getFeedId() {
+ return connectionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
new file mode 100644
index 0000000..3ac28c9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * The class in charge of executing feed adapters.
+ */
+public class AdapterExecutor implements Runnable {
+
+ private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
+
+ private final DistributeFeedFrameWriter writer; // A writer that sends frames to multiple receivers (that can
+ // increase or decrease at any time)
+ private final IFeedAdapter adapter; // The adapter
+ private final IAdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
+
+ public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
+ IAdapterRuntimeManager adapterManager) {
+ this.writer = writer;
+ this.adapter = adapter;
+ this.adapterManager = adapterManager;
+ }
+
+ @Override
+ public void run() {
+ // Start by getting the partition number from the manager
+ int partition = adapterManager.getPartition();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Starting ingestion for partition:" + partition);
+ }
+ boolean continueIngestion = true;
+ boolean failedIngestion = false;
+ while (continueIngestion) {
+ try {
+ // Start the adapter
+ adapter.start(partition, writer);
+ // Adapter has completed execution
+ continueIngestion = false;
+ } catch (Exception e) {
+ LOGGER.error("Exception during feed ingestion ", e);
+ // Check if the adapter wants to continue ingestion
+ if (ExternalDataExceptionUtils.isResolvable(e)) {
+ continueIngestion = adapter.handleException(e);
+ } else {
+ continueIngestion = false;
+ }
+ failedIngestion = !continueIngestion;
+ }
+ }
+ // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the
+ // runtime manager
+ adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
+ synchronized (adapterManager) {
+ adapterManager.notifyAll();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
new file mode 100644
index 0000000..6c3e44d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.log4j.Logger;
+
+/**
+ * This class manages the execution of an adapter within a feed
+ */
+public class AdapterRuntimeManager implements IAdapterRuntimeManager {
+
+ private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
+
+ private final FeedId feedId; // (dataverse-feed)
+
+ private final IFeedAdapter feedAdapter; // The adapter
+
+ private final IIntakeProgressTracker tracker; // Not used. needs to be fixed soon.
+
+ private final AdapterExecutor adapterExecutor; // The executor for the adapter <-- two way visibility -->
+
+ private final int partition; // The partition number
+
+ private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor
+
+ private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed <-- two way
+ // visibility -->
+
+ private State state; // One of {ACTIVE_INGESTION, NACTIVE_INGESTION, FINISHED_INGESTION,
+ // FAILED_INGESTION}
+
+ public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
+ DistributeFeedFrameWriter writer, int partition) {
+ this.feedId = feedId;
+ this.feedAdapter = feedAdapter;
+ this.tracker = tracker;
+ this.partition = partition;
+ this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.state = State.INACTIVE_INGESTION;
+ }
+
+ @Override
+ public void start() throws Exception {
+ state = State.ACTIVE_INGESTION;
+ executorService.execute(adapterExecutor);
+ }
+
+ @Override
+ public void stop() {
+ boolean stopped = false;
+ try {
+ stopped = feedAdapter.stop();
+ } catch (Exception exception) {
+ LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
+ } finally {
+ state = State.FINISHED_INGESTION;
+ if (stopped) {
+ // stop() returned true, we wait for the process termination
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
+ }
+ } else {
+ // stop() returned false, we try to force shutdown
+ executorService.shutdownNow();
+ }
+
+ }
+ }
+
+ @Override
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ @Override
+ public String toString() {
+ return feedId + "[" + partition + "]";
+ }
+
+ @Override
+ public IFeedAdapter getFeedAdapter() {
+ return feedAdapter;
+ }
+
+ public IIntakeProgressTracker getTracker() {
+ return tracker;
+ }
+
+ @Override
+ public synchronized State getState() {
+ return state;
+ }
+
+ @Override
+ public synchronized void setState(State state) {
+ this.state = state;
+ }
+
+ public AdapterExecutor getAdapterExecutor() {
+ return adapterExecutor;
+ }
+
+ @Override
+ public int getPartition() {
+ return partition;
+ }
+
+ public IngestionRuntime getIngestionRuntime() {
+ return ingestionRuntime;
+ }
+
+ @Override
+ public IIntakeProgressTracker getProgressTracker() {
+ return tracker;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
new file mode 100644
index 0000000..967dc3e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+/**
+ * Represents the feed runtime that collects feed tuples from another feed.
+ * In case of a primary feed, the CollectionRuntime collects tuples from the feed
+ * intake job. For a secondary feed, tuples are collected from the intake/compute
+ * runtime associated with the source feed.
+ */
+public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+
+ private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
+ private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data
+ private final Map<String, String> feedPolicy; // Policy associated with the feed
+ private FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor
+
+ public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
+ Map<String, String> feedPolicy) {
+ super(runtimeId, inputSideHandler, outputSideWriter);
+ this.connectionId = connectionId;
+ this.sourceRuntime = sourceRuntime;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public State waitTillCollectionOver() throws InterruptedException {
+ if (!(isCollectionOver())) {
+ synchronized (frameCollector) {
+ while (!isCollectionOver()) {
+ frameCollector.wait();
+ }
+ }
+ }
+ return frameCollector.getState();
+ }
+
+ private boolean isCollectionOver() {
+ return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
+ || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
+ }
+
+ @Override
+ public void setMode(Mode mode) {
+ getInputHandler().setMode(mode);
+ }
+
+ @Override
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public ISubscribableRuntime getSourceRuntime() {
+ return sourceRuntime;
+ }
+
+ public void setFrameCollector(FeedFrameCollector frameCollector) {
+ this.frameCollector = frameCollector;
+ }
+
+ @Override
+ public FeedFrameCollector getFrameCollector() {
+ return frameCollector;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
new file mode 100644
index 0000000..76b1b19
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class FeedRuntime implements IFeedRuntime {
+
+ /** A unique identifier for the runtime **/
+ protected final FeedRuntimeId runtimeId;
+
+ /** The output frame writer associated with the runtime **/
+ protected IFrameWriter frameWriter;
+
+ /** The pre-processor associated with the runtime **/
+ protected FeedRuntimeInputHandler inputHandler;
+
+ public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
+ this.runtimeId = runtimeId;
+ this.frameWriter = frameWriter;
+ this.inputHandler = inputHandler;
+ }
+
+ public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
+ this.frameWriter = frameWriter;
+ }
+
+ @Override
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ @Override
+ public IFrameWriter getFeedFrameWriter() {
+ return frameWriter;
+ }
+
+ @Override
+ public String toString() {
+ return runtimeId.toString();
+ }
+
+ @Override
+ public FeedRuntimeInputHandler getInputHandler() {
+ return inputHandler;
+ }
+
+ public Mode getMode() {
+ return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
+ }
+
+ public void setMode(Mode mode) {
+ this.inputHandler.setMode(mode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
new file mode 100644
index 0000000..45d8afe
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedRuntimeId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_OPERAND_ID = "N/A";
+
+ private final FeedRuntimeType runtimeType;
+ private final int partition;
+ private final String operandId;
+
+ public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
+ this.runtimeType = runtimeType;
+ this.partition = partition;
+ this.operandId = operandId;
+ }
+
+ @Override
+ public String toString() {
+ return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FeedRuntimeId)) {
+ return false;
+ }
+ FeedRuntimeId other = (FeedRuntimeId) o;
+ return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
+ .getPartition() == partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public FeedRuntimeType getFeedRuntimeType() {
+ return runtimeType;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
+ public String getOperandId() {
+ return operandId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
new file mode 100644
index 0000000..fd6fcb3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.logging.Level;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FrameDistributor;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class IngestionRuntime extends SubscribableRuntime {
+
+ private final IAdapterRuntimeManager adapterRuntimeManager;
+
+ public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+ RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+ super(feedId, runtimeId, null, feedWriter, recordDesc);
+ this.adapterRuntimeManager = adaptorRuntimeManager;
+ }
+
+ @Override
+ public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
+ FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
+ collectionRuntime.getConnectionId());
+ collectionRuntime.setFrameCollector(reader);
+
+ if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+ adapterRuntimeManager.start();
+ }
+ subscribers.add(collectionRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
+ }
+ }
+
+ @Override
+ public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+ if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
+ }
+ adapterRuntimeManager.stop();
+ } else {
+ dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
+ }
+ subscribers.remove(collectionRuntime);
+ }
+
+ public void endOfFeed() {
+ dWriter.notifyEndOfFeed();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Notified End Of Feed [" + this + "]");
+ }
+ }
+
+ public IAdapterRuntimeManager getAdapterRuntimeManager() {
+ return adapterRuntimeManager;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
new file mode 100644
index 0000000..f6db99c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+
+public class SubscribableFeedRuntimeId extends FeedRuntimeId {
+ private static final long serialVersionUID = 1L;
+ private final FeedId feedId;
+
+ public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
+ super(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+ this.feedId = feedId;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SubscribableFeedRuntimeId)) {
+ return false;
+ }
+
+ return (super.equals(o) && this.feedId.equals(((SubscribableFeedRuntimeId) o).getFeedId()));
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() + feedId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
new file mode 100644
index 0000000..056875c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
+
+ protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
+
+ protected final FeedId feedId;
+ protected final List<ISubscriberRuntime> subscribers;
+ protected final RecordDescriptor recordDescriptor;
+ protected final DistributeFeedFrameWriter dWriter;
+
+ public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler,
+ DistributeFeedFrameWriter dWriter, RecordDescriptor recordDescriptor) {
+ super(runtimeId, inputHandler, dWriter);
+ this.feedId = feedId;
+ this.recordDescriptor = recordDescriptor;
+ this.dWriter = dWriter;
+ this.subscribers = new ArrayList<ISubscriberRuntime>();
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ @Override
+ public String toString() {
+ return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
+ }
+
+ @Override
+ public synchronized void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime)
+ throws Exception {
+ FeedFrameCollector collector = dWriter.subscribeFeed(new FeedPolicyAccessor(collectionRuntime.getFeedPolicy()),
+ collectionRuntime.getInputHandler(), collectionRuntime.getConnectionId());
+ collectionRuntime.setFrameCollector(collector);
+ subscribers.add(collectionRuntime);
+ }
+
+ @Override
+ public synchronized void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+ dWriter.unsubscribeFeed(collectionRuntime.getFeedFrameWriter());
+ subscribers.remove(collectionRuntime);
+ }
+
+ @Override
+ public synchronized List<ISubscriberRuntime> getSubscribers() {
+ return subscribers;
+ }
+
+ @Override
+ public DistributeFeedFrameWriter getFeedFrameWriter() {
+ return dWriter;
+ }
+
+ public FeedRuntimeType getFeedRuntimeType() {
+ return runtimeId.getFeedRuntimeType();
+ }
+
+ @Override
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
new file mode 100644
index 0000000..ad40608
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class BasicMonitoredBuffer extends MonitoredBuffer {
+
+ public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
+ RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean logInflowOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
new file mode 100644
index 0000000..211fc7b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
+
+ public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return true;
+ }
+
+ protected boolean logInflowOutflowRate() {
+ return true;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return true;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return false;
+ }
+
+}