You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:03:58 UTC
[5/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
deleted file mode 100644
index 22dcfac..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageFrameHandler {
-
- private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
- private long avgDelayPersistence;
-
- public StorageFrameHandler() {
- intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
- avgDelayPersistence = 0L;
- }
-
- public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
- int nTuples = frameAccessor.getTupleCount();
- long delay = 0;
- long intakeTimestamp;
- long currentTime = System.currentTimeMillis();
- int partition = 0;
- int recordId = 0;
- for (int i = 0; i < nTuples; i++) {
- int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
- int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
- + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
- recordId = frame.getInt(recordStart + recordIdOffset);
-
- int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
- partition = frame.getInt(recordStart + partitionOffset);
-
- ackRecordId(partition, recordId);
- int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
- + 1;
- intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-
- int storeTimestampValueOffset = intakeTimestampValueOffset + 8
- + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
- frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
- delay += currentTime - intakeTimestamp;
- }
- avgDelayPersistence = delay / nTuples;
- }
-
- private void ackRecordId(int partition, int recordId) {
- Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
- if (map == null) {
- map = new HashMap<Integer, IntakePartitionStatistics>();
- intakeStatistics.put(partition, map);
- }
- int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
- IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
- if (intakeStatsForBaseOfPartition == null) {
- intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
- map.put(base, intakeStatsForBaseOfPartition);
- }
- intakeStatsForBaseOfPartition.ackRecordId(recordId);
- }
-
- public byte[] getAckData(int partition, int base) {
- Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
- if (intakeStats != null) {
- IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
- if (intakePartitionStats != null) {
- return intakePartitionStats.getAckInfo();
- }
- }
- return null;
- }
-
- public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
- Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
- Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
- for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
- clone.put(entry.getKey(), entry.getValue());
- }
- return intakeStatsForPartition;
- }
-
- public long getAvgDelayPersistence() {
- return avgDelayPersistence;
- }
-
- public void setAvgDelayPersistence(long avgDelayPersistence) {
- this.avgDelayPersistence = avgDelayPersistence;
- }
-
- public Set<Integer> getPartitionsWithStats() {
- return intakeStatistics.keySet();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
new file mode 100644
index 0000000..1bdc7e1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class SyncFeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final FeedExceptionHandler exceptionHandler;
+
+ public SyncFeedRuntimeInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, FrameTupleAccessor fta) {
+ this.writer = writer;
+ this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ while (frame != null) {
+ try {
+ writer.nextFrame(frame);
+ return;
+ } catch (HyracksDataException e) {
+ frame = exceptionHandler.handle(e, frame);
+ if (frame == null) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
new file mode 100644
index 0000000..25aa86a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ConcurrentFramePool {
+ private static final String ERROR_INVALID_FRAME_SIZE =
+ "The size should be an integral multiple of the default frame size";
+ private final String nodeId;
+ private final int budget;
+ private final int defaultFrameSize;
+ private final ArrayDeque<ByteBuffer> pool;
+ private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
+ private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
+ private int handedOut;
+ private int created;
+
+ public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
+ this.nodeId = nodeId;
+ this.defaultFrameSize = frameSize;
+ this.budget = (int) (budgetInBytes / frameSize);
+ this.pool = new ArrayDeque<>(budget);
+ this.largeFramesPools = new HashMap<>();
+ }
+
+ public synchronized ByteBuffer get() {
+ if (handedOut < budget) {
+ handedOut++;
+ return allocate();
+ }
+ return null;
+ }
+
+ public int remaining() {
+ return budget - handedOut;
+ }
+
+ public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ if (bufferSize % defaultFrameSize != 0) {
+ throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
+ }
+ int multiplier = bufferSize / defaultFrameSize;
+ if (handedOut + multiplier <= budget) {
+ handedOut += multiplier;
+ ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
+ if (largeFramesPool == null || largeFramesPool.isEmpty()) {
+ if (created + multiplier > budget) {
+ freeup(multiplier);
+ }
+ created += multiplier;
+ return ByteBuffer.allocate(bufferSize);
+ }
+ return largeFramesPool.poll();
+ }
+ // Not enough budget
+ return null;
+ }
+
+ private int freeup(int desiredNumberOfFreePages) {
+ int needToFree = desiredNumberOfFreePages - (budget - created);
+ int freed = 0;
+ // start by large frames
+ for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
+ .hasNext();) {
+ Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
+ if (entry.getKey() != desiredNumberOfFreePages) {
+ while (!entry.getValue().isEmpty()) {
+ entry.getValue().pop();
+ freed += entry.getKey();
+ if (freed >= needToFree) {
+ // created is handled here
+ created -= freed;
+ return freed;
+ }
+ }
+ it.remove();
+ }
+ }
+ // freed all large pages. need to free small pages as well
+ needToFree -= freed;
+ while (needToFree > 0) {
+ pool.pop();
+ needToFree--;
+ freed++;
+ }
+ created -= freed;
+ return freed;
+ }
+
+ private ByteBuffer allocate() {
+ if (pool.isEmpty()) {
+ if (created == budget) {
+ freeup(1);
+ }
+ created++;
+ return ByteBuffer.allocate(defaultFrameSize);
+ } else {
+ return pool.pop();
+ }
+ }
+
+ public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
+ if (handedOut + count <= budget) {
+ handedOut += count;
+ for (int i = 0; i < count; i++) {
+ buffers.add(allocate());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "ConcurrentFramePool [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
+ }
+
+ public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
+ for (ByteBuffer buffer : buffers) {
+ release(buffer);
+ }
+ }
+
+ public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
+ int multiples = buffer.capacity() / defaultFrameSize;
+ handedOut -= multiples;
+ if (multiples == 1) {
+ pool.add(buffer);
+ } else {
+ ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
+ if (largeFramesPool == null) {
+ largeFramesPool = new ArrayDeque<>();
+ largeFramesPools.put(multiples, largeFramesPool);
+ }
+ largeFramesPool.push(buffer);
+ }
+ // check subscribers
+ while (!subscribers.isEmpty()) {
+ FrameAction frameAction = subscribers.peek();
+ // check if we have enough and answer immediately.
+ if (frameAction.getSize() == defaultFrameSize) {
+ buffer = get();
+ } else {
+ buffer = get(frameAction.getSize());
+ }
+ if (buffer != null) {
+ try {
+ frameAction.call(buffer);
+ } finally {
+ subscribers.remove();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
+ // check if subscribers are empty?
+ if (subscribers.isEmpty()) {
+ ByteBuffer buffer;
+ // check if we have enough and answer immediately.
+ if (frameAction.getSize() == defaultFrameSize) {
+ buffer = get();
+ } else {
+ buffer = get(frameAction.getSize());
+ }
+ if (buffer != null) {
+ frameAction.call(buffer);
+ // There is no need to subscribe. perform action and return false
+ return false;
+ }
+ }
+ // none of the above, add to subscribers and return true
+ subscribers.add(frameAction);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 1af7153..13f19b8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -29,16 +29,17 @@ public class FeedConnectionId implements Serializable {
private static final long serialVersionUID = 1L;
private final FeedId feedId; // Dataverse - Feed
- private final String datasetName; // Dataset
+ private final String datasetName; // Dataset <Dataset is empty in case of no target dataset>
+ private final int hash;
public FeedConnectionId(FeedId feedId, String datasetName) {
this.feedId = feedId;
this.datasetName = datasetName;
+ this.hash = toString().hashCode();
}
public FeedConnectionId(String dataverse, String feedName, String datasetName) {
- this.feedId = new FeedId(dataverse, feedName);
- this.datasetName = datasetName;
+ this(new FeedId(dataverse, feedName), datasetName);
}
public FeedId getFeedId() {
@@ -64,7 +65,7 @@ public class FeedConnectionId implements Serializable {
@Override
public int hashCode() {
- return toString().hashCode();
+ return hash;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
index dd2fc60..a746ef4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
@@ -46,6 +46,7 @@ public class FeedConnectionManager implements IFeedConnectionManager {
this.nodeId = nodeId;
}
+ @Override
public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
return feedRuntimeManagers.get(feedId);
}
@@ -67,8 +68,7 @@ public class FeedConnectionManager implements IFeedConnectionManager {
}
@Override
- public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
- throws Exception {
+ public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) {
FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
if (runtimeMgr == null) {
runtimeMgr = new FeedRuntimeManager(connectionId, this);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..62f4c6c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -18,22 +18,14 @@
*/
package org.apache.asterix.external.feed.management;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.common.config.AsterixFeedProperties;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFeedMetadataManager;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.feed.message.FeedMessageService;
-import org.apache.asterix.external.feed.watch.FeedMetricCollector;
-import org.apache.asterix.external.feed.watch.NodeLoadReportService;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -41,23 +33,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
* Provider necessary central repository for registering/retrieving
* artifacts/services associated with a feed.
*/
-public class FeedManager implements IFeedManager {
+public class FeedManager {
- private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
-
- private final IFeedSubscriptionManager feedSubscriptionManager;
+ private final Map<FeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
private final IFeedConnectionManager feedConnectionManager;
- private final IFeedMemoryManager feedMemoryManager;
-
- private final IFeedMetricCollector feedMetricCollector;
-
- private final IFeedMetadataManager feedMetadataManager;
-
- private final IFeedMessageService feedMessageService;
-
- private final NodeLoadReportService nodeLoadReportService;
+ private final ConcurrentFramePool feedMemoryManager;
private final AsterixFeedProperties asterixFeedProperties;
@@ -68,60 +50,39 @@ public class FeedManager implements IFeedManager {
public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize)
throws AsterixException, HyracksDataException {
this.nodeId = nodeId;
- this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
this.feedConnectionManager = new FeedConnectionManager(nodeId);
- this.feedMetadataManager = new FeedMetadataManager(nodeId);
- this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
- String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null
- ? AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() : "localhost";
- this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
- this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
- try {
- this.feedMessageService.start();
- this.nodeLoadReportService.start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start feed services " + e.getMessage());
- }
- e.printStackTrace();
- }
- this.feedMetricCollector = new FeedMetricCollector(nodeId);
+ this.feedMemoryManager =
+ new ConcurrentFramePool(nodeId, feedProperties.getMemoryComponentGlobalBudget(), frameSize);
this.frameSize = frameSize;
this.asterixFeedProperties = feedProperties;
+ this.subscribableRuntimes = new ConcurrentHashMap<FeedRuntimeId, ISubscribableRuntime>();
}
- @Override
- public IFeedSubscriptionManager getFeedSubscriptionManager() {
- return feedSubscriptionManager;
- }
-
- @Override
public IFeedConnectionManager getFeedConnectionManager() {
return feedConnectionManager;
}
- @Override
- public IFeedMemoryManager getFeedMemoryManager() {
+ public ConcurrentFramePool getFeedMemoryManager() {
return feedMemoryManager;
}
- @Override
- public IFeedMetricCollector getFeedMetricCollector() {
- return feedMetricCollector;
- }
-
public int getFrameSize() {
return frameSize;
}
- @Override
- public IFeedMetadataManager getFeedMetadataManager() {
- return feedMetadataManager;
+ public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
+ FeedRuntimeId sid = subscribableRuntime.getRuntimeId();
+ if (!subscribableRuntimes.containsKey(sid)) {
+ subscribableRuntimes.put(sid, subscribableRuntime);
+ }
}
- @Override
- public IFeedMessageService getFeedMessageService() {
- return feedMessageService;
+ public void deregisterFeedSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
+ subscribableRuntimes.remove(subscribableRuntimeId);
+ }
+
+ public ISubscribableRuntime getSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
+ return subscribableRuntimes.get(subscribableRuntimeId);
}
@Override
@@ -129,7 +90,6 @@ public class FeedManager implements IFeedManager {
return "FeedManager " + "[" + nodeId + "]";
}
- @Override
public AsterixFeedProperties getAsterixFeedProperties() {
return asterixFeedProperties;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
deleted file mode 100644
index de9d22c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.external.feed.dataflow.DataBucketPool;
-import org.apache.asterix.external.feed.dataflow.FrameCollection;
-
-public class FeedMemoryManager implements IFeedMemoryManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
- private static final int ALLOCATION_INCREMENT = 10;
-
- private final AtomicInteger componentId = new AtomicInteger(0);
- private final String nodeId;
- private final int budget;
- private final int frameSize;
-
- private int committed;
-
- public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
- this.nodeId = nodeId;
- this.frameSize = frameSize;
- budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
- }
- }
-
- @Override
- public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
- IFeedMemoryComponent memoryComponent = null;
- boolean valid = false;
- switch (type) {
- case COLLECTION:
- valid = committed + START_COLLECTION_SIZE <= budget;
- if (valid) {
- memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
- }
- break;
- case POOL:
- valid = committed + START_POOL_SIZE <= budget;
- if (valid) {
- memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
- frameSize);
- }
- committed += START_POOL_SIZE;
- break;
- }
- if (!valid) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to allocate memory component of type" + type);
- }
- }
- return valid ? memoryComponent : null;
- }
-
- @Override
- public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
- if (committed + ALLOCATION_INCREMENT > budget) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
- + " frames.");
- }
- return false;
- } else {
- memoryComponent.expand(ALLOCATION_INCREMENT);
- committed += ALLOCATION_INCREMENT;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
- }
- return true;
- }
- }
-
- @Override
- public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
- int delta = memoryComponent.getTotalAllocation();
- committed -= delta;
- memoryComponent.reset();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
- }
- }
-
- @Override
- public String toString() {
- return "FeedMemoryManager [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
deleted file mode 100644
index 34ae461..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Date;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMetadataManager;
-import org.apache.asterix.external.feed.message.XAQLFeedMessage;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedMetadataManager implements IFeedMetadataManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
-
- private final String nodeId;
- private ARecordType recordType;
-
- public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
- this.nodeId = nodeId;
- String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
- "timestamp" };
- IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
-
- recordType = new ARecordType(FeedConstants.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
- }
-
- @Override
- public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
- throws AsterixException {
- try {
- AString id = new AString("1");
- AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
- AString feedValue = new AString(connectionId.getFeedId().getFeedName());
- AString targetDatasetValue = new AString(connectionId.getDatasetName());
- AString tupleValue = new AString(tuple);
- AString messageValue = new AString(message);
- AString dateTime = new AString(new Date().toString());
-
- IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
- messageValue, dateTime };
- ARecord record = new ARecord(recordType, fields);
- StringBuilder builder = new StringBuilder();
- builder.append("use dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
- builder.append("insert into dataset " + FeedConstants.FAILED_TUPLE_DATASET + " ");
- builder.append(" (" + recordToString(record) + ")");
- builder.append(";");
-
- XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
- feedManager.getFeedMessageService().sendMessage(xAqlMessage);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Sent " + xAqlMessage.toJSON());
- }
- } catch (Exception pe) {
- throw new AsterixException(pe);
- }
- }
-
- @Override
- public String toString() {
- return "FeedMetadataManager [" + nodeId + "]";
- }
-
- private String recordToString(ARecord record) {
- String[] fieldNames = record.getType().getFieldNames();
- StringBuilder sb = new StringBuilder();
- sb.append("{ ");
- for (int i = 0; i < fieldNames.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append("\"" + fieldNames[i] + "\"");
- sb.append(": ");
- switch (record.getType().getFieldTypes()[i].getTypeTag()) {
- case STRING:
- sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
- break;
- default:
- break;
- }
- }
- sb.append(" }");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
deleted file mode 100644
index e402f92..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-
-public class FeedSubscriptionManager implements IFeedSubscriptionManager {
-
- private static Logger LOGGER = Logger.getLogger(FeedSubscriptionManager.class.getName());
-
- private final String nodeId;
-
- private final Map<SubscribableFeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
-
- public FeedSubscriptionManager(String nodeId) {
- this.nodeId = nodeId;
- this.subscribableRuntimes = new HashMap<SubscribableFeedRuntimeId, ISubscribableRuntime>();
- }
-
- @Override
- public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
- SubscribableFeedRuntimeId sid = (SubscribableFeedRuntimeId) subscribableRuntime.getRuntimeId();
- if (!subscribableRuntimes.containsKey(sid)) {
- subscribableRuntimes.put(sid, subscribableRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered feed subscribable runtime " + subscribableRuntime);
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Feed ingestion runtime " + subscribableRuntime + " already registered.");
- }
- }
- }
-
- @Override
- public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableFeedRuntimeId) {
- return subscribableRuntimes.get(subscribableFeedRuntimeId);
- }
-
- @Override
- public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId ingestionId) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("De-registered feed subscribable runtime " + ingestionId);
- }
- subscribableRuntimes.remove(ingestionId);
- }
-
- @Override
- public String toString() {
- return "IngestionManager [" + nodeId + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
deleted file mode 100644
index 6c924d2..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedConstants.MessageConstants;
-
-public class FeedCongestionMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private int inflowRate;
- private int outflowRate;
- private Mode mode;
-
- public FeedCongestionMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, int inflowRate,
- int outflowRate, Mode mode) {
- super(MessageType.CONGESTION);
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.inflowRate = inflowRate;
- this.outflowRate = outflowRate;
- this.mode = mode;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
- obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
- obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
- obj.put(FeedConstants.MessageConstants.INFLOW_RATE, inflowRate);
- obj.put(FeedConstants.MessageConstants.OUTFLOW_RATE, outflowRate);
- obj.put(FeedConstants.MessageConstants.MODE, mode);
- return obj;
- }
-
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- public int getInflowRate() {
- return inflowRate;
- }
-
- public int getOutflowRate() {
- return outflowRate;
- }
-
- public static FeedCongestionMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
- .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
- obj.getInt(FeedConstants.MessageConstants.PARTITION),
- obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
- Mode mode = Mode.valueOf(obj.getString(MessageConstants.MODE));
- return new FeedCongestionMessage(connectionId, runtimeId,
- obj.getInt(FeedConstants.MessageConstants.INFLOW_RATE),
- obj.getInt(FeedConstants.MessageConstants.OUTFLOW_RATE), mode);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public Mode getMode() {
- return mode;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
deleted file mode 100644
index 13d6622..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.util.FeedConstants;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
- private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
- private final LinkedBlockingQueue<String> inbox;
- private final FeedMessageHandler mesgHandler;
- private final String nodeId;
- private ExecutorService executor;
-
- public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
- this.inbox = new LinkedBlockingQueue<String>();
- this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
- this.nodeId = nodeId;
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- public void start() throws Exception {
-
- executor.execute(mesgHandler);
- }
-
- public void stop() {
- synchronized (mesgHandler.getLock()) {
- executor.shutdownNow();
- }
- mesgHandler.stop();
- }
-
- @Override
- public void sendMessage(IFeedMessage message) {
- try {
- JSONObject obj = message.toJSON();
- obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
- inbox.add(obj.toString());
- } catch (JSONException jse) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
- }
- }
- }
-
- private static class FeedMessageHandler implements Runnable {
-
- private final LinkedBlockingQueue<String> inbox;
- private final String host;
- private final int port;
- private final Object lock;
-
- private Socket cfmSocket;
-
- private static final byte[] EOL = "\n".getBytes();
-
- public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
- this.inbox = inbox;
- this.host = host;
- this.port = port;
- this.lock = new Object();
- }
-
- public void run() {
- try {
- cfmSocket = new Socket(host, port);
- if (cfmSocket != null) {
- while (true) {
- String message = inbox.take();
- synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
- cfmSocket.getOutputStream().write(message.getBytes());
- cfmSocket.getOutputStream().write(EOL);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start feed message service");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
- }
- } finally {
- stop();
- }
-
- }
-
- public void stop() {
- if (cfmSocket != null) {
- try {
- cfmSocket.close();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing socket " + e.getMessage());
- }
- }
- }
- }
-
- public Object getLock() {
- return lock;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
deleted file mode 100644
index 1b8c45d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedConstants.MessageConstants;
-
-public class FeedReportMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final FeedRuntimeId runtimeId;
- private final ValueType valueType;
- private int value;
-
- public FeedReportMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType, int value) {
- super(MessageType.FEED_REPORT);
- this.connectionId = connectionId;
- this.runtimeId = runtimeId;
- this.valueType = valueType;
- this.value = value;
- }
-
- public void reset(int value) {
- this.value = value;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
- obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
- obj.put(FeedConstants.MessageConstants.VALUE_TYPE, valueType);
- obj.put(FeedConstants.MessageConstants.VALUE, value);
- return obj;
- }
-
- public static FeedReportMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
- .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
- obj.getInt(FeedConstants.MessageConstants.PARTITION), FeedConstants.MessageConstants.NOT_APPLICABLE);
- ValueType type = ValueType.valueOf(obj.getString(MessageConstants.VALUE_TYPE));
- int value = Integer.parseInt(obj.getString(MessageConstants.VALUE));
- return new FeedReportMessage(connectionId, runtimeId, type, value);
- }
-
- public int getValue() {
- return value;
- }
-
- public void setValue(int value) {
- this.value = value;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- public ValueType getValueType() {
- return valueType;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
deleted file mode 100644
index 61e26de..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private int intakePartition;
- private int base;
- private byte[] commitAcks;
-
- public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
- super(MessageType.COMMIT_ACK);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.BASE, base);
- String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
- obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
- return obj;
- }
-
- public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int base = obj.getInt(FeedConstants.MessageConstants.BASE);
- String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
- byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
- return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
- public byte[] getCommitAcks() {
- return commitAcks;
- }
-
- public void reset(int intakePartition, int base, byte[] commitAcks) {
- this.intakePartition = intakePartition;
- this.base = base;
- this.commitAcks = commitAcks;
- }
-
- public int getBase() {
- return base;
- }
-
- public void setBase(int base) {
- this.base = base;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index a61dc06..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
- private final int intakePartition;
- private final int maxWindowAcked;
-
- public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
- super(MessageType.COMMIT_ACK_RESPONSE);
- this.connectionId = connectionId;
- this.intakePartition = intakePartition;
- this.maxWindowAcked = maxWindowAcked;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
- obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
- return obj;
- }
-
- @Override
- public String toString() {
- return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
- }
-
- public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
- FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
- obj.getString(FeedConstants.MessageConstants.FEED));
- FeedConnectionId connectionId = new FeedConnectionId(feedId,
- obj.getString(FeedConstants.MessageConstants.DATASET));
- int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
- int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
- return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getMaxWindowAcked() {
- return maxWindowAcked;
- }
-
- public int getIntakePartition() {
- return intakePartition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
deleted file mode 100644
index 67f2884..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class MessageListener {
-
- private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
-
- private int port;
- private final LinkedBlockingQueue<String> outbox;
-
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- private MessageListenerServer listenerServer;
-
- public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- listenerServer.stop();
- if (!executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- }
-
- public void start() throws IOException {
- listenerServer = new MessageListenerServer(port, outbox);
- executorService.execute(listenerServer);
- }
-
- private static class MessageListenerServer implements Runnable {
-
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private ServerSocket server;
-
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- Socket client = null;
- try {
- server = new ServerSocket(port);
- client = server.accept();
- InputStream in = client.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- while (true) {
- ch = (char) in.read();
- if (((int) ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (outbox) {
- outbox.add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start Message listener" + server);
- }
- } finally {
- if (server != null) {
- try {
- server.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
deleted file mode 100644
index 6ed176a..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IMessageReceiver;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
-
- protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
-
- protected final ArrayBlockingQueue<T> inbox;
- protected ExecutorService executor;
-
- public MessageReceiver() {
- inbox = new ArrayBlockingQueue<T>(2);
- }
-
- public abstract void processMessage(T message) throws Exception;
-
- @Override
- public void start() {
- executor = Executors.newSingleThreadExecutor();
- executor.execute(new MessageReceiverRunnable<T>(this));
- }
-
- @Override
- public synchronized void sendMessage(T message) throws InterruptedException {
- inbox.put(message);
- }
-
- @Override
- public void close(boolean processPending) {
- if (executor != null) {
- executor.shutdown();
- executor = null;
- if (processPending) {
- flushPendingMessages();
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Will discard the pending frames " + inbox.size());
- }
- }
- }
- }
-
- private static class MessageReceiverRunnable<T> implements Runnable {
-
- private final ArrayBlockingQueue<T> inbox;
- private final MessageReceiver<T> messageReceiver;
-
- public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
- this.inbox = messageReceiver.inbox;
- this.messageReceiver = messageReceiver;
- }
- // TODO: this should handle exceptions better
-
- @Override
- public void run() {
- while (true) {
- try {
- T message = inbox.poll();
- if (message == null) {
- messageReceiver.emptyInbox();
- message = inbox.take();
- }
- messageReceiver.processMessage(message);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- protected void flushPendingMessages() {
- while (!inbox.isEmpty()) {
- T message = null;
- try {
- message = inbox.take();
- processMessage(message);
- } catch (InterruptedException ie) {
- // ignore exception but break from the loop
- break;
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception " + e + " in processing message " + message);
- }
- }
- }
- }
-
- public abstract void emptyInbox() throws HyracksDataException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
deleted file mode 100644
index 1548d6d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class NodeReportMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private double cpuLoad;
- private double usedHeap;
- private int nRuntimes;
-
- public NodeReportMessage(float cpuLoad, long usedHeap, int nRuntimes) {
- super(IFeedMessage.MessageType.NODE_REPORT);
- this.usedHeap = usedHeap;
- this.cpuLoad = cpuLoad;
- this.nRuntimes = nRuntimes;
- }
-
- public void reset(double cpuLoad, double usedHeap, int nRuntimes) {
- this.cpuLoad = cpuLoad;
- this.usedHeap = usedHeap;
- this.nRuntimes = nRuntimes;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.CPU_LOAD, cpuLoad);
- obj.put(FeedConstants.MessageConstants.HEAP_USAGE, usedHeap);
- obj.put(FeedConstants.MessageConstants.N_RUNTIMES, nRuntimes);
- return obj;
- }
-
- public double getCpuLoad() {
- return cpuLoad;
- }
-
- public double getUsedHeap() {
- return usedHeap;
- }
-
- public int getnRuntimes() {
- return nRuntimes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
deleted file mode 100644
index 76fe0c2..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedConstants;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
- */
-public class PrepareStallMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
-
- private final int computePartitionsRetainLimit;
-
- public PrepareStallMessage(FeedConnectionId connectionId, int computePartitionsRetainLimit) {
- super(MessageType.PREPARE_STALL);
- this.connectionId = connectionId;
- this.computePartitionsRetainLimit = computePartitionsRetainLimit;
- }
-
- @Override
- public String toString() {
- return MessageType.PREPARE_STALL.name() + " " + connectionId;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.COMPUTE_PARTITION_RETAIN_LIMIT, computePartitionsRetainLimit);
- return obj;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getComputePartitionsRetainLimit() {
- return computePartitionsRetainLimit;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
deleted file mode 100644
index 0749f82..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class RemoteSocketMessageListener {
-
- private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
-
- private final String host;
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private final ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- private RemoteMessageListenerServer listenerServer;
-
- public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
- this.host = host;
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- if (!executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- listenerServer.stop();
-
- }
-
- public void start() throws IOException {
- listenerServer = new RemoteMessageListenerServer(host, port, outbox);
- executorService.execute(listenerServer);
- }
-
- private static class RemoteMessageListenerServer implements Runnable {
-
- private final String host;
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private Socket client;
-
- public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
- this.host = host;
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- try {
- client.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- char EOL = (char) "\n".getBytes()[0];
- Socket client = null;
- try {
- client = new Socket(host, port);
- InputStream in = client.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- while (true) {
- ch = (char) in.read();
- if ((ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (outbox) {
- outbox.add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start Remote Message listener" + client);
- }
- } finally {
- if (client != null && !client.isClosed()) {
- try {
- client.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
-
- public static interface IMessageAnalyzer {
-
- /**
- * @return
- */
- public LinkedBlockingQueue<String> getMessageQueue();
-
- }
-
-}