You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:09 UTC
[15/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
new file mode 100644
index 0000000..6ad00f1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedFrameHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameHandlers {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
+
+ public enum RoutingMode {
+ IN_MEMORY_ROUTE,
+ SPILL_TO_DISK,
+ DISCARD
+ }
+
+ public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
+ RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
+ IFeedFrameHandler handler = null;
+ switch (routingMode) {
+ case IN_MEMORY_ROUTE:
+ handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
+ break;
+ case SPILL_TO_DISK:
+ handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
+ break;
+ case DISCARD:
+ handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid routing mode" + routingMode);
+ }
+ return handler;
+ }
+
+ public static class DiscardRouter implements IFeedFrameHandler {
+
+ private final FeedId feedId;
+ private int nDiscarded;
+ private final FeedRuntimeType runtimeType;
+ private final int partition;
+ private final FrameDistributor distributor;
+
+ public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
+ throws HyracksDataException {
+ this.distributor = distributor;
+ this.feedId = feedId;
+ this.nDiscarded = 0;
+ this.runtimeType = runtimeType;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ FrameTupleAccessor fta = distributor.getFta();
+ fta.reset(frame);
+ int nTuples = fta.getTupleCount();
+ nDiscarded += nTuples;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + " " + nTuples);
+ }
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ nDiscarded++;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discard Count" + nDiscarded);
+ }
+ }
+
+ @Override
+ public void close() {
+ // do nothing, no resource to relinquish
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ throw new IllegalStateException("Invalid operation");
+ }
+
+ @Override
+ public String toString() {
+ return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
+ }
+
+ @Override
+ public String getSummary() {
+ return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
+ + nDiscarded + ")");
+ }
+
+ }
+
+ public static class InMemoryRouter implements IFeedFrameHandler {
+
+ private final Collection<FeedFrameCollector> frameCollectors;
+
+ public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
+ int partition) {
+ this.frameCollectors = frameCollectors;
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ for (FeedFrameCollector collector : frameCollectors) {
+ collector.sendMessage(bucket); // asynchronous call
+ }
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public String getSummary() {
+ return "InMemoryRouter Summary";
+ }
+ }
+
+ public static class DiskSpiller implements IFeedFrameHandler {
+
+ private FrameSpiller<ByteBuffer> receiver;
+ private Iterator<ByteBuffer> iterator;
+
+ public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
+ int frameSize) throws IOException {
+ receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ receiver.sendMessage(frame);
+ }
+
+ private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
+
+ private final FeedId feedId;
+ private BufferedOutputStream bos;
+ private final ByteBuffer reusableLengthBuffer;
+ private final ByteBuffer reusableDataBuffer;
+ private long offset;
+ private File file;
+ private final FrameDistributor frameDistributor;
+ private boolean fileCreated = false;
+
+ public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
+ this.feedId = feedId;
+ this.frameDistributor = distributor;
+ reusableLengthBuffer = ByteBuffer.allocate(4);
+ reusableDataBuffer = ByteBuffer.allocate(frameSize);
+ this.offset = 0;
+ }
+
+ @Override
+ public void processMessage(ByteBuffer message) throws Exception {
+ if (!fileCreated) {
+ createFile();
+ fileCreated = true;
+ }
+ reusableLengthBuffer.flip();
+ reusableLengthBuffer.putInt(message.array().length);
+ bos.write(reusableLengthBuffer.array());
+ bos.write(message.array());
+ }
+
+ private void createFile() throws IOException {
+ Date date = new Date();
+ String dateSuffix = date.toString().replace(' ', '_');
+ String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
+ + frameDistributor.getPartition() + "_" + dateSuffix;
+
+ file = new File(fileName);
+ if (!file.exists()) {
+ boolean success = file.createNewFile();
+ if (!success) {
+ throw new IOException("Unable to create spill file for feed " + feedId);
+ }
+ }
+ bos = new BufferedOutputStream(new FileOutputStream(file));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Created Spill File for feed " + feedId);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ public Iterator<ByteBuffer> replayData() throws Exception {
+ final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+ bis.skip(offset);
+ return new Iterator<ByteBuffer>() {
+
+ @Override
+ public boolean hasNext() {
+ boolean more = false;
+ try {
+ more = bis.available() > 0;
+ if (!more) {
+ bis.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return more;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ reusableLengthBuffer.flip();
+ try {
+ bis.read(reusableLengthBuffer.array());
+ reusableLengthBuffer.flip();
+ int frameSize = reusableLengthBuffer.getInt();
+ reusableDataBuffer.flip();
+ bis.read(reusableDataBuffer.array(), 0, frameSize);
+ offset += 4 + frameSize;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return reusableDataBuffer;
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ };
+ }
+
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public void close() {
+ receiver.close(true);
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ try {
+ iterator = receiver.replayData();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ return iterator;
+ }
+
+ //TODO: Form a summary that includes stats related to what has been spilled to disk
+ @Override
+ public String getSummary() {
+ return "Disk Spiller Summary";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
new file mode 100644
index 0000000..c9a29ac
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameSpiller {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+ private final IHyracksTaskContext ctx;
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final FeedPolicyAccessor policyAccessor;
+ private BufferedOutputStream bos;
+ private File file;
+ private boolean fileCreated = false;
+ private long bytesWritten = 0;
+ private int spilledFrameCount = 0;
+
+ public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ FeedPolicyAccessor policyAccessor) throws HyracksDataException {
+ this.ctx = ctx;
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.policyAccessor = policyAccessor;
+ }
+
+ public boolean processMessage(ByteBuffer message) throws HyracksDataException {
+ if (!fileCreated) {
+ createFile();
+ fileCreated = true;
+ }
+ long maxAllowed = policyAccessor.getMaxSpillOnDisk();
+ if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
+ return false;
+ } else {
+ try {
+ bos.write(message.array());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ bytesWritten += message.array().length;
+ spilledFrameCount++;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
+ }
+ return true;
+ }
+ }
+
+ private void createFile() throws HyracksDataException {
+ try {
+ Date date = new Date();
+ String dateSuffix = date.toString().replace(' ', '_');
+ String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+ + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
+
+ file = new File(fileName);
+ if (!file.exists()) {
+ boolean success = file.createNewFile();
+ if (!success) {
+ throw new HyracksDataException(
+ "Unable to create spill file " + fileName + " for feed " + runtimeId);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Created spill file " + file.getAbsolutePath());
+ }
+ }
+ }
+ bos = new BufferedOutputStream(new FileOutputStream(file));
+ } catch (Throwable th) {
+ throw new HyracksDataException(th);
+ }
+ }
+
+ public Iterator<ByteBuffer> replayData() throws Exception {
+ bos.flush();
+ return new FrameIterator(ctx, file.getName());
+ }
+
+ private static class FrameIterator implements Iterator<ByteBuffer> {
+
+ private final BufferedInputStream bis;
+ private final IHyracksTaskContext ctx;
+ private int readFrameCount = 0;
+
+ public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
+ bis = new BufferedInputStream(new FileInputStream(new File(filename)));
+ this.ctx = ctx;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean more = false;
+ try {
+ more = bis.available() > 0;
+ if (!more) {
+ bis.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return more;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ IFrame frame = null;
+ try {
+ frame = new VSizeFrame(ctx);
+ Arrays.fill(frame.getBuffer().array(), (byte) 0);
+ bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
+ readFrameCount++;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Read spill frome " + readFrameCount);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return frame.getBuffer();
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ }
+
+ public void reset() {
+ bytesWritten = 0;
+ // file.delete();
+ fileCreated = false;
+ bos = null;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resetted the FrameSpiller!");
+ }
+ }
+
+ public void close() {
+ if (bos != null) {
+ try {
+ bos.flush();
+ bos.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
new file mode 100644
index 0000000..f08243e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
+
+ private final FrameTupleAccessor frameAccessor;
+ private final int numOpenFields;
+
+ public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
+ this.frameAccessor = frameAccessor;
+ int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
+ numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
+ }
+
+ public int getFeedIntakePartition(int tupleIndex) {
+ ByteBuffer buffer = frameAccessor.getBuffer();
+ int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = buffer.getInt(recordStart + 6);
+ int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+ + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
+ return buffer.getInt(recordStart + partitionOffset);
+ }
+
+
+
+ @Override
+ public int getFieldCount() {
+ return frameAccessor.getFieldCount();
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return frameAccessor.getFieldSlotsLength();
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldLength(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return frameAccessor.getTupleEndOffset(tupleIndex);
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return frameAccessor.getTupleStartOffset(tupleIndex);
+ }
+
+ @Override
+ public int getTupleCount() {
+ return frameAccessor.getTupleCount();
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return frameAccessor.getBuffer();
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ frameAccessor.reset(buffer);
+ }
+
+ @Override
+ public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleLength(int tupleIndex) {
+ return frameAccessor.getTupleLength(tupleIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
new file mode 100644
index 0000000..d43f90d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class FeedFrameTupleDecorator {
+
+ private AMutableString aString = new AMutableString("");
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AtomicInteger tupleId;
+
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+
+ private final int partition;
+ private final ArrayBackedValueStorage attrNameStorage;
+ private final ArrayBackedValueStorage attrValueStorage;
+
+ public FeedFrameTupleDecorator(int partition) {
+ this.tupleId = new AtomicInteger(0);
+ this.partition = partition;
+ this.attrNameStorage = new ArrayBackedValueStorage();
+ this.attrValueStorage = new ArrayBackedValueStorage();
+ }
+
+ public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
+ throws HyracksDataException, AsterixException {
+ attrNameStorage.reset();
+ aString.setValue(attrName);
+ stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+ attrValueStorage.reset();
+ aInt64.setValue(attrValue);
+ int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
+
+ recordBuilder.addField(attrNameStorage, attrValueStorage);
+ }
+
+ public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
+ throws HyracksDataException, AsterixException {
+ attrNameStorage.reset();
+ aString.setValue(attrName);
+ stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+ attrValueStorage.reset();
+ aInt32.setValue(attrValue);
+ int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
+
+ recordBuilder.addField(attrNameStorage, attrValueStorage);
+ }
+
+ public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
+ }
+
+ public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
+ }
+
+ public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+ }
+
+ public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+ addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
new file mode 100644
index 0000000..3a46b1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.DataBucket.ContentType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedCongestionMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.watch.MonitoredBuffer;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides for error-handling and input-side buffering for a feed runtime.
+ * The input handler is buffering in:
+ * 1. FeedMetaComputeNodePushable.initializeNewFeedRuntime();
+ * 2. FeedMetaStoreNodePushable.initializeNewFeedRuntime();
+ * ______
+ * | |
+ * ============|core |============
+ * ============| op |============
+ * ^^^^^^^^^^^^|______|
+ * Input Side
+ * Handler
+ *
+ **/
+public class FeedRuntimeInputHandler implements IFrameWriter {
+
+ private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
+
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final FeedPolicyAccessor feedPolicyAccessor;
+ private final IExceptionHandler exceptionHandler;
+ private final FeedFrameDiscarder discarder;
+ private final FeedFrameSpiller spiller;
+ private final FeedPolicyAccessor fpa;
+ private final IFeedManager feedManager;
+ private boolean bufferingEnabled;
+ private IFrameWriter coreOperator;
+ private MonitoredBuffer mBuffer;
+ private DataBucketPool pool;
+ private FrameCollection frameCollection;
+ private Mode mode;
+ private Mode lastMode;
+ private boolean finished;
+ private long nProcessed;
+ private boolean throttlingEnabled;
+
+ private FrameEventCallback frameEventCallback;
+
+ public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IFrameWriter coreOperator, FeedPolicyAccessor fpa, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedManager feedManager, int nPartitions) throws HyracksDataException {
+ this(ctx, connectionId, runtimeId, coreOperator, fpa, fpa.bufferingEnabled(), fta, recordDesc, feedManager,
+ nPartitions);
+ }
+
+ public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
+ RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws HyracksDataException {
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.coreOperator = coreOperator;
+ this.bufferingEnabled = bufferingEnabled;
+ this.feedPolicyAccessor = fpa;
+ this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
+ this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
+ this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
+ this.mode = Mode.PROCESS;
+ this.lastMode = Mode.PROCESS;
+ this.finished = false;
+ this.fpa = fpa;
+ this.feedManager = feedManager;
+ this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+ this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
+ this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
+ feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
+ nPartitions, fpa);
+ this.mBuffer.start();
+ this.throttlingEnabled = false;
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ try {
+ switch (mode) {
+ case PROCESS:
+ switch (lastMode) {
+ case SPILL:
+ case POST_SPILL_DISCARD:
+ setMode(Mode.PROCESS_SPILL);
+ processSpilledBacklog();
+ break;
+ case STALL:
+ setMode(Mode.PROCESS_BACKLOG);
+ processBufferredBacklog();
+ break;
+ default:
+ break;
+ }
+ process(frame);
+ break;
+ case PROCESS_BACKLOG:
+ case PROCESS_SPILL:
+ process(frame);
+ break;
+ case SPILL:
+ spill(frame);
+ break;
+ case DISCARD:
+ case POST_SPILL_DISCARD:
+ discard(frame);
+ break;
+ case STALL:
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COLLECT:
+ case COMPUTE_COLLECT:
+ case COMPUTE:
+ case STORE:
+ bufferDataUntilRecovery(frame);
+ break;
+ default:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
+ }
+ break;
+ }
+ break;
+ case END:
+ case FAIL:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
+ }
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
+ }
+ if (frameCollection == null) {
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+ .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+ }
+ if (frameCollection == null) {
+ discarder.processMessage(frame);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Running low on memory! DISCARDING FRAME ");
+ }
+ } else {
+ boolean success = frameCollection.addFrame(frame);
+ if (!success) {
+ if (fpa.spillToDiskOnCongestion()) {
+ if (frame != null) {
+ spiller.processMessage(frame);
+ } // TODO handle the else case
+ } else {
+ discarder.processMessage(frame);
+ }
+ }
+ }
+ }
+
+ public void reportUnresolvableCongestion() throws HyracksDataException {
+ if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
+ FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
+ mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
+ feedManager.getFeedMessageService().sendMessage(congestionReport);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
+ }
+ }
+ }
+
+ private void processBufferredBacklog() throws HyracksDataException {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing backlog " + this.runtimeId);
+ }
+
+ if (frameCollection != null) {
+ Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
+ while (backlog.hasNext()) {
+ process(backlog.next());
+ nProcessed++;
+ }
+ DataBucket bucket = pool.getDataBucket();
+ bucket.setContentType(ContentType.EOSD);
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+ frameCollection = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void processSpilledBacklog() throws HyracksDataException {
+ try {
+ Iterator<ByteBuffer> backlog = spiller.replayData();
+ while (backlog.hasNext()) {
+ process(backlog.next());
+ nProcessed++;
+ }
+ DataBucket bucket = pool.getDataBucket();
+ bucket.setContentType(ContentType.EOSD);
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ spiller.reset();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ protected void process(ByteBuffer frame) throws HyracksDataException {
+ boolean frameProcessed = false;
+ while (!frameProcessed) {
+ try {
+ if (!bufferingEnabled) {
+ if (frame == null) {
+ setFinished(true);
+ synchronized (coreOperator) {
+ coreOperator.notifyAll();
+ }
+ } else {
+ coreOperator.nextFrame(frame); // synchronous
+ mBuffer.sendReport(frame);
+ }
+ } else {
+ DataBucket bucket = pool.getDataBucket();
+ if (bucket != null) {
+ if (frame != null) {
+ bucket.reset(frame); // created a copy here
+ bucket.setContentType(ContentType.DATA);
+ } else {
+ bucket.setContentType(ContentType.EOD);
+ }
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ mBuffer.sendReport(frame);
+ nProcessed++;
+ } else {
+ if (fpa.spillToDiskOnCongestion()) {
+ if (frame != null) {
+ boolean spilled = spiller.processMessage(frame);
+ if (spilled) {
+ setMode(Mode.SPILL);
+ } else {
+ reportUnresolvableCongestion();
+ }
+ }
+ } else if (fpa.discardOnCongestion()) {
+ boolean discarded = discarder.processMessage(frame);
+ if (!discarded) {
+ reportUnresolvableCongestion();
+ }
+ } else if (fpa.throttlingEnabled()) {
+ setThrottlingEnabled(true);
+ } else {
+ reportUnresolvableCongestion();
+ }
+
+ }
+ }
+ frameProcessed = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (feedPolicyAccessor.continueOnSoftFailure()) {
+ frame = exceptionHandler.handleException(e, frame);
+ if (frame == null) {
+ frameProcessed = true;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Encountered exception! " + e.getMessage()
+ + "Insufficient information, Cannot extract failing tuple");
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
+ }
+ mBuffer.close(false);
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
+ private void spill(ByteBuffer frame) throws Exception {
+ boolean success = spiller.processMessage(frame);
+ if (!success) {
+ // limit reached
+ setMode(Mode.POST_SPILL_DISCARD);
+ reportUnresolvableCongestion();
+ }
+ }
+
+ private void discard(ByteBuffer frame) throws Exception {
+ boolean success = discarder.processMessage(frame);
+ if (!success) { // limit reached
+ reportUnresolvableCongestion();
+ }
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public synchronized void setMode(Mode mode) {
+ if (mode.equals(this.mode)) {
+ return;
+ }
+ this.lastMode = this.mode;
+ this.mode = mode;
+ if (mode.equals(Mode.END)) {
+ this.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (mBuffer != null) {
+ boolean disableMonitoring = !this.mode.equals(Mode.STALL);
+ if (frameCollection != null) {
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+ }
+ if (pool != null) {
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
+ }
+ mBuffer.close(false, disableMonitoring);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
+ + disableMonitoring + " Mode for runtime " + this.mode);
+ }
+ }
+ }
+
+ public IFrameWriter getCoreOperator() {
+ return coreOperator;
+ }
+
+ public void setCoreOperator(IFrameWriter coreOperator) {
+ this.coreOperator = coreOperator;
+ mBuffer.setFrameWriter(coreOperator);
+ frameEventCallback.setCoreOperator(coreOperator);
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+
+ public long getProcessed() {
+ return nProcessed;
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ coreOperator.open();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ coreOperator.fail();
+ }
+
+ public void reset(int nPartitions) {
+ this.mBuffer.setNumberOfPartitions(nPartitions);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
+ }
+ if (mBuffer != null) {
+ mBuffer.reset();
+ }
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public IFeedManager getFeedManager() {
+ return feedManager;
+ }
+
+ public MonitoredBuffer getmBuffer() {
+ return mBuffer;
+ }
+
+ public boolean isThrottlingEnabled() {
+ return throttlingEnabled;
+ }
+
+ public void setThrottlingEnabled(boolean throttlingEnabled) {
+ if (this.throttlingEnabled != throttlingEnabled) {
+ this.throttlingEnabled = throttlingEnabled;
+ IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+ feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
+ }
+ }
+ }
+
+ public boolean isBufferingEnabled() {
+ return bufferingEnabled;
+ }
+
+ public void setBufferingEnabled(boolean bufferingEnabled) {
+ this.bufferingEnabled = bufferingEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
new file mode 100644
index 0000000..7980712
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
+
+/**
+ * Represents an expandable collection of frames.
+ */
+public class FrameCollection implements IFeedMemoryComponent {
+
+ /** A unique identifier for the feed memory component **/
+ private final int componentId;
+
+ /** A collection of frames (each being a ByteBuffer) **/
+ private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
+
+ /** The permitted maximum size, the collection may grow to **/
+ private int maxSize;
+
+ /** The {@link IFeedMemoryManager} for the NodeController **/
+ private final IFeedMemoryManager memoryManager;
+
+ public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
+ this.componentId = componentId;
+ this.maxSize = maxSize;
+ this.memoryManager = memoryManager;
+ }
+
+ public boolean addFrame(ByteBuffer frame) {
+ if (frames.size() == maxSize) {
+ boolean expansionGranted = memoryManager.expandMemoryComponent(this);
+ if (!expansionGranted) {
+ return false;
+ }
+ }
+ ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
+ storageBuffer.put(frame);
+ frames.add(storageBuffer);
+ storageBuffer.flip();
+ return true;
+ }
+
+ public Iterator<ByteBuffer> getFrameCollectionIterator() {
+ return frames.iterator();
+ }
+
+ @Override
+ public int getTotalAllocation() {
+ return frames.size();
+ }
+
+ @Override
+ public Type getType() {
+ return Type.COLLECTION;
+ }
+
+ @Override
+ public int getComponentId() {
+ return componentId;
+ }
+
+ @Override
+ public void expand(int delta) {
+ maxSize = maxSize + delta;
+ }
+
+ @Override
+ public void reset() {
+ frames.clear();
+ maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
+ }
+
+ @Override
+ public String toString() {
+ return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
new file mode 100644
index 0000000..543efb2
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FrameDistributor {
+
+ private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
+
+ private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
+
+ private final FeedId feedId;
+ private final FeedRuntimeType feedRuntimeType;
+ private final int partition;
+ private final IFeedMemoryManager memoryManager;
+ private final boolean enableSynchronousTransfer;
+ /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
+ private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
+ private final FrameTupleAccessor fta;
+
+ private DataBucketPool pool;
+ private DistributionMode distributionMode;
+ private boolean spillToDiskRequired = false;
+
+ public enum DistributionMode {
+ /**
+ * A single feed frame collector is registered for receiving tuples.
+ * Tuple is sent via synchronous call, that is no buffering is involved
+ **/
+ SINGLE,
+
+ /**
+ * Multiple feed frame collectors are concurrently registered for
+ * receiving tuples.
+ **/
+ SHARED,
+
+ /**
+ * Feed tuples are not being processed, irrespective of # of registered
+ * feed frame collectors.
+ **/
+ INACTIVE
+ }
+
+ public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
+ boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
+ throws HyracksDataException {
+ this.feedId = feedId;
+ this.feedRuntimeType = feedRuntimeType;
+ this.partition = partition;
+ this.memoryManager = memoryManager;
+ this.enableSynchronousTransfer = enableSynchronousTransfer;
+ this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
+ this.distributionMode = DistributionMode.INACTIVE;
+ this.fta = fta;
+ }
+
+ public void notifyEndOfFeed() {
+ DataBucket bucket = getDataBucket();
+ if (bucket != null) {
+ sendEndOfFeedDataBucket(bucket);
+ } else {
+ while (bucket == null) {
+ try {
+ Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
+ bucket = getDataBucket();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if (bucket != null) {
+ sendEndOfFeedDataBucket(bucket);
+ }
+ }
+ }
+
+ private void sendEndOfFeedDataBucket(DataBucket bucket) {
+ bucket.setContentType(DataBucket.ContentType.EOD);
+ nextBucket(bucket);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("End of feed data packet sent " + this.feedId);
+ }
+ }
+
+ public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
+ DistributionMode currentMode = distributionMode;
+ switch (distributionMode) {
+ case INACTIVE:
+ if (!enableSynchronousTransfer) {
+ pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+ frameCollector.start();
+ }
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ setMode(DistributionMode.SINGLE);
+ break;
+ case SINGLE:
+ pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ for (FeedFrameCollector reader : registeredCollectors.values()) {
+ reader.start();
+ }
+ setMode(DistributionMode.SHARED);
+ break;
+ case SHARED:
+ frameCollector.start();
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ break;
+ }
+ evaluateIfSpillIsEnabled();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(
+ "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
+ }
+ }
+
+ public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
+ switch (distributionMode) {
+ case INACTIVE:
+ throw new IllegalStateException(
+ "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
+ case SHARED:
+ frameCollector.closeCollector();
+ registeredCollectors.remove(frameCollector.getFrameWriter());
+ int nCollectors = registeredCollectors.size();
+ if (nCollectors == 1) {
+ FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
+ setMode(DistributionMode.SINGLE);
+ loneCollector.setState(FeedFrameCollector.State.TRANSITION);
+ loneCollector.closeCollector();
+ memoryManager.releaseMemoryComponent(pool);
+ evaluateIfSpillIsEnabled();
+ } else {
+ if (!spillToDiskRequired) {
+ evaluateIfSpillIsEnabled();
+ }
+ }
+ break;
+ case SINGLE:
+ frameCollector.closeCollector();
+ setMode(DistributionMode.INACTIVE);
+ spillToDiskRequired = false;
+ break;
+
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
+ }
+ }
+
+ public void evaluateIfSpillIsEnabled() {
+ if (!spillToDiskRequired) {
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ spillToDiskRequired = spillToDiskRequired
+ || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
+ if (spillToDiskRequired) {
+ break;
+ }
+ }
+ }
+ }
+
+ public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
+ FeedFrameCollector collector = registeredCollectors.get(frameWriter);
+ if (collector != null) {
+ deregisterFrameCollector(collector);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized void setMode(DistributionMode mode) {
+ this.distributionMode = mode;
+ }
+
+ public boolean isRegistered(IFrameWriter writer) {
+ return registeredCollectors.get(writer) != null;
+ }
+
+ public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ switch (distributionMode) {
+ case INACTIVE:
+ break;
+ case SINGLE:
+ FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+ switch (collector.getState()) {
+ case HANDOVER:
+ case ACTIVE:
+ if (enableSynchronousTransfer) {
+ collector.nextFrame(frame); // processing is synchronous
+ } else {
+ handleDataBucket(frame);
+ }
+ break;
+ case TRANSITION:
+ handleDataBucket(frame);
+ break;
+ case FINISHED:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
+ + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
+ }
+ registeredCollectors.remove(0);
+ break;
+ }
+ break;
+ case SHARED:
+ handleDataBucket(frame);
+ break;
+ }
+ }
+
+ private void nextBucket(DataBucket bucket) {
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ collector.sendMessage(bucket); // asynchronous call
+ }
+ }
+
+ private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
+ DataBucket bucket = getDataBucket();
+ if (bucket == null) {
+ handleFrameDuringMemoryCongestion(frame);
+ } else {
+ bucket.reset(frame);
+ bucket.setDesiredReadCount(registeredCollectors.size());
+ nextBucket(bucket);
+ }
+ }
+
+ private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
+ }
+ // wait till memory is available
+ }
+
+ private DataBucket getDataBucket() {
+ DataBucket bucket = null;
+ if (pool != null) {
+ bucket = pool.getDataBucket();
+ if (bucket != null) {
+ bucket.setDesiredReadCount(registeredCollectors.size());
+ return bucket;
+ } else {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public DistributionMode getMode() {
+ return distributionMode;
+ }
+
+ public void close() {
+ switch (distributionMode) {
+ case INACTIVE:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("FrameDistributor is " + distributionMode);
+ }
+ break;
+ case SINGLE:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for feedId "
+ + feedId + " " + this.feedRuntimeType);
+ }
+ setMode(DistributionMode.INACTIVE);
+ if (!enableSynchronousTransfer) {
+ notifyEndOfFeed(); // send EOD Data Bucket
+ waitForCollectorsToFinish();
+ }
+ registeredCollectors.values().iterator().next().disconnect();
+ break;
+ case SHARED:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
+ }
+ notifyEndOfFeed(); // send EOD Data Bucket
+ waitForCollectorsToFinish();
+ break;
+ }
+ }
+
+ private void waitForCollectorsToFinish() {
+ synchronized (registeredCollectors.values()) {
+ while (!allCollectorsFinished()) {
+ try {
+ registeredCollectors.values().wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private boolean allCollectorsFinished() {
+ boolean allFinished = true;
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
+ }
+ return allFinished;
+ }
+
+ public Collection<FeedFrameCollector> getRegisteredCollectors() {
+ return registeredCollectors.values();
+ }
+
+ public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+ return registeredCollectors;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public DistributionMode getDistributionMode() {
+ return distributionMode;
+ }
+
+ public FeedRuntimeType getFeedRuntimeType() {
+ return feedRuntimeType;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public FrameTupleAccessor getFta() {
+ return fta;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
new file mode 100644
index 0000000..ba67862
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameEventCallback implements IFrameEventCallback {
+
+ private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
+
+ private final FeedPolicyAccessor fpa;
+ private final FeedRuntimeInputHandler inputSideHandler;
+ private IFrameWriter coreOperator;
+
+ public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
+ IFrameWriter coreOperator) {
+ this.fpa = fpa;
+ this.inputSideHandler = inputSideHandler;
+ this.coreOperator = coreOperator;
+ }
+
+ @Override
+ public void frameEvent(FrameEvent event) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
+ }
+ if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
+ && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
+ return;
+ }
+ switch (event) {
+ case PENDING_WORK_THRESHOLD_REACHED:
+ if (fpa.spillToDiskOnCongestion()) {
+ inputSideHandler.setMode(Mode.SPILL);
+ } else if (fpa.discardOnCongestion()) {
+ inputSideHandler.setMode(Mode.DISCARD);
+ } else if (fpa.throttlingEnabled()) {
+ inputSideHandler.setThrottlingEnabled(true);
+ } else {
+ try {
+ inputSideHandler.reportUnresolvableCongestion();
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to report congestion!!!");
+ }
+ }
+ }
+ break;
+ case FINISHED_PROCESSING:
+ inputSideHandler.setFinished(true);
+ synchronized (coreOperator) {
+ coreOperator.notifyAll();
+ }
+ break;
+ case PENDING_WORK_DONE:
+ switch (inputSideHandler.getMode()) {
+ case SPILL:
+ case DISCARD:
+ case POST_SPILL_DISCARD:
+ inputSideHandler.setMode(Mode.PROCESS);
+ break;
+ default:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
+ }
+ }
+ break;
+ case FINISHED_PROCESSING_SPILLAGE:
+ inputSideHandler.setMode(Mode.PROCESS);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void setCoreOperator(IFrameWriter coreOperator) {
+ this.coreOperator = coreOperator;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
new file mode 100644
index 0000000..22dcfac
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageFrameHandler {
+
+ private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
+ private long avgDelayPersistence;
+
+ public StorageFrameHandler() {
+ intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
+ avgDelayPersistence = 0L;
+ }
+
+ public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+ int nTuples = frameAccessor.getTupleCount();
+ long delay = 0;
+ long intakeTimestamp;
+ long currentTime = System.currentTimeMillis();
+ int partition = 0;
+ int recordId = 0;
+ for (int i = 0; i < nTuples; i++) {
+ int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = frame.getInt(recordStart + 6);
+ int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+ int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+ + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+ recordId = frame.getInt(recordStart + recordIdOffset);
+
+ int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
+ partition = frame.getInt(recordStart + partitionOffset);
+
+ ackRecordId(partition, recordId);
+ int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
+ + 1;
+ intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+
+ int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+ + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+ frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+ delay += currentTime - intakeTimestamp;
+ }
+ avgDelayPersistence = delay / nTuples;
+ }
+
+ private void ackRecordId(int partition, int recordId) {
+ Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
+ if (map == null) {
+ map = new HashMap<Integer, IntakePartitionStatistics>();
+ intakeStatistics.put(partition, map);
+ }
+ int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
+ IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
+ if (intakeStatsForBaseOfPartition == null) {
+ intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
+ map.put(base, intakeStatsForBaseOfPartition);
+ }
+ intakeStatsForBaseOfPartition.ackRecordId(recordId);
+ }
+
+ public byte[] getAckData(int partition, int base) {
+ Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
+ if (intakeStats != null) {
+ IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
+ if (intakePartitionStats != null) {
+ return intakePartitionStats.getAckInfo();
+ }
+ }
+ return null;
+ }
+
+ public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
+ Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
+ Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
+ for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
+ clone.put(entry.getKey(), entry.getValue());
+ }
+ return intakeStatsForPartition;
+ }
+
+ public long getAvgDelayPersistence() {
+ return avgDelayPersistence;
+ }
+
+ public void setAvgDelayPersistence(long avgDelayPersistence) {
+ this.avgDelayPersistence = avgDelayPersistence;
+ }
+
+ public Set<Integer> getPartitionsWithStats() {
+ return intakeStatistics.keySet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
new file mode 100644
index 0000000..9f861d4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedCollectInfo extends FeedInfo {
+ public FeedId sourceFeedId;
+ public FeedConnectionId feedConnectionId;
+ public List<String> collectLocations = new ArrayList<String>();
+ public List<String> computeLocations = new ArrayList<String>();
+ public List<String> storageLocations = new ArrayList<String>();
+ public Map<String, String> feedPolicy;
+ public String superFeedManagerHost;
+ public int superFeedManagerPort;
+ public boolean fullyConnected;
+
+ public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
+ JobId jobId, Map<String, String> feedPolicy) {
+ super(jobSpec, jobId, FeedInfoType.COLLECT);
+ this.sourceFeedId = sourceFeedId;
+ this.feedConnectionId = feedConnectionId;
+ this.feedPolicy = feedPolicy;
+ this.fullyConnected = true;
+ }
+
+ @Override
+ public String toString() {
+ return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
new file mode 100644
index 0000000..1af7153
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a
+ * dataset.
+ */
+public class FeedConnectionId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedId feedId; // Dataverse - Feed
+ private final String datasetName; // Dataset
+
+ public FeedConnectionId(FeedId feedId, String datasetName) {
+ this.feedId = feedId;
+ this.datasetName = datasetName;
+ }
+
+ public FeedConnectionId(String dataverse, String feedName, String datasetName) {
+ this.feedId = new FeedId(dataverse, feedName);
+ this.datasetName = datasetName;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof FeedConnectionId)) {
+ return false;
+ }
+
+ if (this == o || (((FeedConnectionId) o).getFeedId().equals(feedId)
+ && ((FeedConnectionId) o).getDatasetName().equals(datasetName))) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return feedId.toString() + "-->" + datasetName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
new file mode 100644
index 0000000..dd2fc60
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedConnectionManager;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedConnectionManager implements IFeedConnectionManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
+
+ private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
+ private final String nodeId;
+
+ public FeedConnectionManager(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
+ return feedRuntimeManagers.get(feedId);
+ }
+
+ @Override
+ public void deregisterFeed(FeedConnectionId feedId) {
+ try {
+ FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+ if (mgr != null) {
+ mgr.close();
+ feedRuntimeManagers.remove(feedId);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
+ }
+ }
+
+ }
+
+ @Override
+ public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
+ throws Exception {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ if (runtimeMgr == null) {
+ runtimeMgr = new FeedRuntimeManager(connectionId, this);
+ feedRuntimeManagers.put(connectionId, runtimeMgr);
+ }
+ runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
+ }
+
+ @Override
+ public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ if (runtimeMgr != null) {
+ runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+ }
+ }
+
+ @Override
+ public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+ FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+ return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
+ }
+
+ @Override
+ public String toString() {
+ return "FeedManager " + "[" + nodeId + "]";
+ }
+
+ @Override
+ public List<FeedRuntimeId> getRegisteredRuntimes() {
+ List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
+ for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
+ runtimes.addAll(entry.getValue().getFeedRuntimes());
+ }
+ return runtimes;
+ }
+}