You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 13:34:05 UTC
[3/7] asterixdb git commit: Refactor General Active Classes
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
deleted file mode 100644
index b8375e3..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public interface IFeedRuntime {
-
- public enum FeedRuntimeType {
- INTAKE,
- COLLECT,
- COMPUTE_COLLECT,
- COMPUTE,
- STORE,
- OTHER,
- ETS,
- JOIN
- }
-
- public enum Mode {
- PROCESS, // There is memory
- SPILL, // Memory budget has been consumed. Now we're writing to disk
- DISCARD // Memory budget has been consumed. Disk space budget has been consumed. Now we're
- // discarding
- }
-
- /**
- * @return the unique runtime id associated with the feedRuntime
- */
- public FeedRuntimeId getRuntimeId();
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
index 1ca46ce..397f797 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
@@ -18,13 +18,14 @@
*/
package org.apache.asterix.external.feed.api;
+import org.apache.asterix.active.IActiveRuntime;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Represent a feed runtime whose output can be routed along other parallel path(s).
*/
-public interface ISubscribableRuntime extends IFeedRuntime {
+public interface ISubscribableRuntime extends IActiveRuntime {
/**
* @param collectionRuntime
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index f356899..ae2e0b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -21,13 +21,11 @@ package org.apache.asterix.external.feed.dataflow;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
/**
* Provides mechanism for distributing the frames, as received from an operator to a
@@ -38,7 +36,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class DistributeFeedFrameWriter implements IFrameWriter {
/** A unique identifier for the feed to which the incoming tuples belong. **/
- private final FeedId feedId;
+ private final EntityId feedId;
/**
* An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each
@@ -55,8 +53,8 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
/** The value of the partition 'i' if this is the i'th instance of the associated operator **/
private final int partition;
- public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
- FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta) throws IOException {
+ public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
+ int partition) throws IOException {
this.feedId = feedId;
this.frameDistributor = new FrameDistributor();
this.feedRuntimeType = feedRuntimeType;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 8ed2bf9..2e1c83f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -23,11 +23,12 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.FrameAction;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.Mode;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -67,7 +68,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
private int numProcessedInMemory = 0;
private int numStalled = 0;
- public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
throws HyracksDataException {
this.writer = writer;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
deleted file mode 100644
index f02b4aa..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.log4j.Logger;
-
-public class FrameAction {
- private static final boolean DEBUG = false;
- private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
- private ByteBuffer allocated;
- private ByteBuffer frame;
-
- public void call(ByteBuffer freeFrame) {
- if (DEBUG) {
- LOGGER.info("FrameAction: My subscription is being answered");
- }
- freeFrame.put(frame);
- synchronized (this) {
- allocated = freeFrame;
- if (DEBUG) {
- LOGGER.info("FrameAction: Waking up waiting threads");
- }
- notifyAll();
- }
- }
-
- public synchronized ByteBuffer retrieve() throws InterruptedException {
- if (DEBUG) {
- LOGGER.info("FrameAction: Attempting to get allocated buffer");
- }
- while (allocated == null) {
- if (DEBUG) {
- LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
- }
- wait();
- if (DEBUG) {
- LOGGER.info("FrameAction: Awoken Up");
- }
- }
- ByteBuffer temp = allocated;
- allocated = null;
- return temp;
- }
-
- public void setFrame(ByteBuffer frame) {
- this.frame = frame;
- }
-
- public int getSize() {
- return frame.capacity();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
deleted file mode 100644
index e5543d6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.external.feed.dataflow.FrameAction;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class ConcurrentFramePool {
- private static final boolean DEBUG = false;
- private static final String ERROR_INVALID_FRAME_SIZE =
- "The size should be an integral multiple of the default frame size";
- private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
- "The requested frame size must not be greater than the allocated budget";
- private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
- private final String nodeId;
- private final int budget;
- private final int defaultFrameSize;
- private final ArrayDeque<ByteBuffer> pool;
- private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
- private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
- private int handedOut;
- private int created;
-
- public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
- this.nodeId = nodeId;
- this.defaultFrameSize = frameSize;
- this.budget = (int) (budgetInBytes / frameSize);
- this.pool = new ArrayDeque<>(budget);
- this.largeFramesPools = new HashMap<>();
- }
-
- public int getMaxFrameSize() {
- return budget * defaultFrameSize;
- }
-
- public synchronized ByteBuffer get() {
- // Subscribers have higher priority
- if (subscribers.isEmpty()) {
- return doGet();
- }
- if (DEBUG) {
- LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
- + subscribers.size());
- }
- return null;
- }
-
- private ByteBuffer doGet() {
- if (handedOut < budget) {
- handedOut++;
- return allocate();
- }
- if (DEBUG) {
- LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
- + ", Requested = 1");
- }
- return null;
- }
-
- public int remaining() {
- return budget - handedOut;
- }
-
- private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
- // Subscribers have higher priority
- if (bufferSize % defaultFrameSize != 0) {
- throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
- }
- int multiplier = bufferSize / defaultFrameSize;
- if (multiplier > budget) {
- throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
- }
- if (handedOut + multiplier <= budget) {
- handedOut += multiplier;
- ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
- if (largeFramesPool == null || largeFramesPool.isEmpty()) {
- if (created + multiplier > budget) {
- freeup(multiplier);
- }
- created += multiplier;
- return ByteBuffer.allocate(bufferSize);
- }
- ByteBuffer buffer = largeFramesPool.poll();
- buffer.clear();
- return buffer;
- }
- // Not enough budget
- if (DEBUG) {
- LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
- + ", Requested = " + multiplier);
- }
- return null;
- }
-
- public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
- if (subscribers.isEmpty()) {
- return doGet(bufferSize);
- }
- if (DEBUG) {
- LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
- + subscribers.size());
- }
- return null;
- }
-
- private int freeup(int desiredNumberOfFreePages) {
- int needToFree = desiredNumberOfFreePages - (budget - created);
- int freed = 0;
- // start by large frames
- for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
- .hasNext();) {
- Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
- if (entry.getKey() != desiredNumberOfFreePages) {
- while (!entry.getValue().isEmpty()) {
- entry.getValue().pop();
- freed += entry.getKey();
- if (freed >= needToFree) {
- // created is handled here
- created -= freed;
- return freed;
- }
- }
- it.remove();
- }
- }
- // freed all large pages. need to free small pages as well
- needToFree -= freed;
- while (needToFree > 0) {
- pool.pop();
- needToFree--;
- freed++;
- }
- created -= freed;
- return freed;
- }
-
- private ByteBuffer allocate() {
- if (pool.isEmpty()) {
- if (created == budget) {
- freeup(1);
- }
- created++;
- return ByteBuffer.allocate(defaultFrameSize);
- } else {
- ByteBuffer buffer = pool.pop();
- buffer.clear();
- return buffer;
- }
- }
-
- public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
- if (handedOut + count <= budget) {
- handedOut += count;
- for (int i = 0; i < count; i++) {
- buffers.add(allocate());
- }
- return true;
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "ConcurrentFramePool [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
- }
-
- public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
- for (ByteBuffer buffer : buffers) {
- release(buffer);
- }
- }
-
- public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
- int multiples = buffer.capacity() / defaultFrameSize;
- handedOut -= multiples;
- if (DEBUG) {
- LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
- }
- if (multiples == 1) {
- pool.add(buffer);
- } else {
- ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
- if (largeFramesPool == null) {
- largeFramesPool = new ArrayDeque<>();
- largeFramesPools.put(multiples, largeFramesPool);
- }
- largeFramesPool.push(buffer);
- }
- // check subscribers
- while (!subscribers.isEmpty()) {
- FrameAction frameAction = subscribers.peek();
- ByteBuffer freeBuffer;
- // check if we have enough and answer immediately.
- if (frameAction.getSize() == defaultFrameSize) {
- if (DEBUG) {
- LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
- }
- freeBuffer = doGet();
- } else {
- if (DEBUG) {
- LOGGER.info("Attempting to callback a subscriber that requested "
- + frameAction.getSize() / defaultFrameSize + " frames");
- }
- freeBuffer = doGet(frameAction.getSize());
- }
- if (freeBuffer != null) {
- int handedOutBeforeCall = handedOut;
- try {
- frameAction.call(freeBuffer);
- } catch (Exception e) {
- LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
- // TODO(amoudi): Add test cases and get rid of recursion
- if (handedOut == handedOutBeforeCall) {
- release(freeBuffer);
- }
- throw e;
- } finally {
- subscribers.remove();
- if (DEBUG) {
- LOGGER.info(
- "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
- }
- }
- } else {
- if (DEBUG) {
- LOGGER.info("Failed to allocate requested frames");
- }
- break;
- }
- }
- if (DEBUG) {
- LOGGER.info(subscribers.size() + " remaining subscribers");
- }
- }
-
- public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
- // check if subscribers are empty?
- if (subscribers.isEmpty()) {
- ByteBuffer buffer;
- // check if we have enough and answer immediately.
- if (frameAction.getSize() == defaultFrameSize) {
- buffer = doGet();
- } else {
- buffer = doGet(frameAction.getSize());
- }
- if (buffer != null) {
- frameAction.call(buffer);
- // There is no need to subscribe. perform action and return false
- return false;
- }
- } else {
- int multiplier = frameAction.getSize() / defaultFrameSize;
- if (multiplier > budget) {
- throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
- }
- }
- // none of the above, add to subscribers and return true
- subscribers.add(frameAction);
- return true;
- }
-
- /*
- * For unit testing purposes
- */
- public Collection<FrameAction> getSubscribers() {
- return subscribers;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
deleted file mode 100644
index 9f861d4..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedCollectInfo extends FeedInfo {
- public FeedId sourceFeedId;
- public FeedConnectionId feedConnectionId;
- public List<String> collectLocations = new ArrayList<String>();
- public List<String> computeLocations = new ArrayList<String>();
- public List<String> storageLocations = new ArrayList<String>();
- public Map<String, String> feedPolicy;
- public String superFeedManagerHost;
- public int superFeedManagerPort;
- public boolean fullyConnected;
-
- public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
- JobId jobId, Map<String, String> feedPolicy) {
- super(jobSpec, jobId, FeedInfoType.COLLECT);
- this.sourceFeedId = sourceFeedId;
- this.feedConnectionId = feedConnectionId;
- this.feedPolicy = feedPolicy;
- this.fullyConnected = true;
- }
-
- @Override
- public String toString() {
- return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 13f19b8..e2ab823 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.feed.management;
import java.io.Serializable;
+import org.apache.asterix.active.EntityId;
+
/**
* A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a
* dataset.
@@ -27,22 +29,23 @@ import java.io.Serializable;
public class FeedConnectionId implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String FEED_EXTENSION_NAME = "Feed";
- private final FeedId feedId; // Dataverse - Feed
- private final String datasetName; // Dataset <Dataset is empty in case of no target dataset>
+ private final EntityId feedId; // Dataverse - Feed
+ private final String datasetName; // Dataset <Dataset is empty in case of no target dataset>
private final int hash;
- public FeedConnectionId(FeedId feedId, String datasetName) {
+ public FeedConnectionId(EntityId feedId, String datasetName) {
this.feedId = feedId;
this.datasetName = datasetName;
this.hash = toString().hashCode();
}
public FeedConnectionId(String dataverse, String feedName, String datasetName) {
- this(new FeedId(dataverse, feedName), datasetName);
+ this(new EntityId(FEED_EXTENSION_NAME, dataverse, feedName), datasetName);
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return feedId;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
deleted file mode 100644
index a746ef4..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedConnectionManager implements IFeedConnectionManager {
-
- private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
-
- private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
- private final String nodeId;
-
- public FeedConnectionManager(String nodeId) {
- this.nodeId = nodeId;
- }
-
- @Override
- public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
- return feedRuntimeManagers.get(feedId);
- }
-
- @Override
- public void deregisterFeed(FeedConnectionId feedId) {
- try {
- FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
- if (mgr != null) {
- mgr.close();
- feedRuntimeManagers.remove(feedId);
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
- }
- }
-
- }
-
- @Override
- public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- if (runtimeMgr == null) {
- runtimeMgr = new FeedRuntimeManager(connectionId, this);
- feedRuntimeManagers.put(connectionId, runtimeMgr);
- }
- runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
- }
-
- @Override
- public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- if (runtimeMgr != null) {
- runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
- }
- }
-
- @Override
- public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
- FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
- return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
- }
-
- @Override
- public String toString() {
- return "FeedManager " + "[" + nodeId + "]";
- }
-
- @Override
- public List<FeedRuntimeId> getRegisteredRuntimes() {
- List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
- for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
- runtimes.addAll(entry.getValue().getFeedRuntimes());
- }
- return runtimes;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
index b1d3300..1106160 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
@@ -18,16 +18,20 @@
*/
package org.apache.asterix.external.feed.management;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.commons.lang3.StringUtils;
/**
* A request for connecting a feed to a dataset.
*/
-public class FeedConnectionRequest {
+public class FeedConnectionRequest implements Serializable {
+
+ private static final long serialVersionUID = 1L;
public enum ConnectionStatus {
/** initial state upon creating a connection request **/
@@ -64,11 +68,11 @@ public class FeedConnectionRequest {
/** Target dataset associated with the connection request **/
private final String targetDataset;
- private final FeedId receivingFeedId;
+ private final EntityId receivingFeedId;
public FeedConnectionRequest(FeedJointKey feedPointKey, FeedRuntimeType connectionLocation,
List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
- FeedId receivingFeedId) {
+ EntityId receivingFeedId) {
this.feedJointKey = feedPointKey;
this.connectionLocation = connectionLocation;
this.functionsToApply = functionsToApply;
@@ -103,7 +107,7 @@ public class FeedConnectionRequest {
return connectionLocation;
}
- public FeedId getReceivingFeedId() {
+ public EntityId getReceivingFeedId() {
return receivingFeedId;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
new file mode 100644
index 0000000..40d2500
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.api.FeedOperationCounter;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedJoint.State;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.util.FeedUtils.JobType;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+import org.apache.log4j.Logger;
+
+public class FeedEventsListener implements IActiveEntityEventsListener {
+ private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
+ private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
+ private final List<IFeedLifecycleEventSubscriber> subscribers;
+ private final Map<Long, ActiveJob> jobs;
+ private final Map<Long, ActiveJob> intakeJobs;
+ private final Map<EntityId, FeedIntakeInfo> entity2Intake;
+ private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
+ private EntityId entityId;
+ private IFeedJoint sourceFeedJoint;
+
+ public FeedEventsListener(EntityId entityId) {
+ this.entityId = entityId;
+ subscribers = new ArrayList<>();
+ jobs = new HashMap<>();
+ feedPipeline = new HashMap<>();
+ entity2Intake = new HashMap<>();
+ connectJobInfos = new HashMap<>();
+ intakeJobs = new HashMap<>();
+ }
+
+ @Override
+ public void notify(ActiveEvent event) {
+ try {
+ switch (event.getEventKind()) {
+ case JOB_START:
+ handleJobStartEvent(event);
+ break;
+ case JOB_FINISH:
+ handleJobFinishEvent(event);
+ break;
+ case PARTITION_EVENT:
+ handlePartitionStart(event);
+ break;
+ default:
+ LOGGER.warn("Unknown Feed Event" + event);
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unhandled Exception", e);
+ }
+ }
+
+ private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ JobType jobType = (JobType) jobInfo.getJobObject();
+ switch (jobType) {
+ case INTAKE:
+ handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
+ break;
+ case FEED_CONNECT:
+ handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
+ break;
+ default:
+ }
+ }
+
+ private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ JobType jobType = (JobType) jobInfo.getJobObject();
+ switch (jobType) {
+ case FEED_CONNECT:
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Collect Job finished for " + jobInfo);
+ }
+ handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
+ break;
+ case INTAKE:
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
+ }
+ handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private synchronized void handlePartitionStart(ActiveEvent message) {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ JobType jobType = (JobType) jobInfo.getJobObject();
+ switch (jobType) {
+ case FEED_CONNECT:
+ ((FeedConnectJobInfo) jobInfo).partitionStart();
+ if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
+ notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+ }
+ break;
+ case INTAKE:
+ handleIntakePartitionStarts(message, jobInfo);
+ break;
+ default:
+ break;
+
+ }
+ }
+
+ private void handleIntakePartitionStarts(ActiveEvent message, ActiveJob jobInfo) {
+ if (feedPipeline.get(message.getFeedId()).first.decrementAndGet() == 0) {
+ ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
+ jobInfo.setState(ActivityState.ACTIVE);
+ notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+ }
+ }
+
+ public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+ Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline =
+ feedPipeline.get(feedJoint.getOwnerFeedId());
+ if (feedJointsOnPipeline == null) {
+ feedJointsOnPipeline = new Pair<>(new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>());
+ feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
+ feedJointsOnPipeline.second.add(feedJoint);
+ } else {
+ if (!feedJointsOnPipeline.second.contains(feedJoint)) {
+ feedJointsOnPipeline.second.add(feedJoint);
+ } else {
+ throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
+ }
+ }
+ }
+
+ public synchronized void deregisterFeedIntakeJob(JobId jobId) {
+ FeedIntakeInfo info = (FeedIntakeInfo) intakeJobs.remove(jobId.getId());
+ jobs.remove(jobId.getId());
+ entity2Intake.remove(info.getFeedId());
+ List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second;
+ joints.remove(info.getIntakeFeedJoint());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Deregistered feed intake job [" + jobId + "]");
+ }
+ }
+
+ private static synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
+ List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<>();
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+ intakeOperatorIds.add(opDesc.getOperatorId());
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
+ List<String> intakeLocations = new ArrayList<>();
+ for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ intakeLocations.add(operatorLocations.get(i));
+ }
+ }
+ // intakeLocations is an ordered list;
+ // element at position i corresponds to location of i'th instance of operator
+ intakeJobInfo.setIntakeLocation(intakeLocations);
+ }
+
+ public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ if (cInfo != null) {
+ return cInfo.getSourceFeedJoint();
+ }
+ return null;
+ }
+
+ public synchronized void registerFeedIntakeJob(EntityId feedId, JobId jobId, JobSpecification jobSpec)
+ throws HyracksDataException {
+ if (entity2Intake.get(feedId) != null) {
+ throw new IllegalStateException("Feed already has an intake job");
+ }
+ if (intakeJobs.get(jobId.getId()) != null) {
+ throw new IllegalStateException("Feed job already registered in intake jobs");
+ }
+ if (jobs.get(jobId.getId()) != null) {
+ throw new IllegalStateException("Feed job already registered in all jobs");
+ }
+
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+ sourceFeedJoint = null;
+ for (IFeedJoint joint : pair.second) {
+ if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
+ sourceFeedJoint = joint;
+ break;
+ }
+ }
+
+ if (sourceFeedJoint != null) {
+ FeedIntakeInfo intakeJobInfo =
+ new FeedIntakeInfo(jobId, ActivityState.CREATED, feedId, sourceFeedJoint, jobSpec);
+ pair.first.setFeedJobInfo(intakeJobInfo);
+ entity2Intake.put(feedId, intakeJobInfo);
+ jobs.put(jobId.getId(), intakeJobInfo);
+ intakeJobs.put(jobId.getId(), intakeJobInfo);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
+ }
+ } else {
+ throw new HyracksDataException(
+ "Could not register feed intake job [" + jobId + "]" + " for feed " + feedId);
+ }
+ }
+
+ public synchronized void registerFeedCollectionJob(EntityId sourceFeedId, FeedConnectionId connectionId,
+ JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
+ if (jobs.get(jobId.getId()) != null) {
+ throw new IllegalStateException("Feed job already registered");
+ }
+ if (connectJobInfos.containsKey(jobId.getId())) {
+ throw new IllegalStateException("Feed job already registered");
+ }
+
+ List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second;
+ FeedConnectionId cid = null;
+ IFeedJoint collectionSourceFeedJoint = null;
+ for (IFeedJoint joint : feedJoints) {
+ cid = joint.getReceiver(connectionId);
+ if (cid != null) {
+ collectionSourceFeedJoint = joint;
+ break;
+ }
+ }
+
+ if (cid != null) {
+ FeedConnectJobInfo cInfo = new FeedConnectJobInfo(sourceFeedId, jobId, ActivityState.CREATED, connectionId,
+ collectionSourceFeedJoint, null, jobSpec, feedPolicy);
+ jobs.put(jobId.getId(), cInfo);
+ connectJobInfos.put(connectionId, cInfo);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
+ }
+ } else {
+ LOGGER.warn(
+ "Could not register feed collection job [" + jobId + "]" + " for feed connection " + connectionId);
+ }
+ }
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ FeedConnectionId feedConnectionId = null;
+ Map<String, String> feedPolicy = null;
+ try {
+ for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+ if (opDesc instanceof FeedCollectOperatorDescriptor) {
+ feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
+ feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
+ registerFeedCollectionJob(((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(),
+ feedConnectionId, jobId, spec, feedPolicy);
+ return;
+ } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+ registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(), jobId, spec);
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ }
+
+ public synchronized List<String> getConnectionLocations(IFeedJoint feedJoint, final FeedConnectionRequest request)
+ throws Exception {
+ List<String> locations = null;
+ switch (feedJoint.getType()) {
+ case COMPUTE:
+ FeedConnectionId connectionId = feedJoint.getProvider();
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ locations = cInfo.getComputeLocations();
+ break;
+ case INTAKE:
+ FeedIntakeInfo intakeInfo = entity2Intake.get(feedJoint.getOwnerFeedId());
+ locations = intakeInfo.getIntakeLocation();
+ break;
+ default:
+ break;
+ }
+ return locations;
+ }
+
+ private void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+ if (subscribers != null && !subscribers.isEmpty()) {
+ for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
+ subscriber.handleFeedEvent(event);
+ }
+ }
+ }
+
+ private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message)
+ throws Exception {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(message.getJobId());
+ JobStatus status = info.getStatus();
+ EntityId feedId = intakeInfo.getFeedId();
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+ if (status.equals(JobStatus.FAILURE)) {
+ pair.first.setFailedIngestion(true);
+ }
+ // remove feed joints
+ deregisterFeedIntakeJob(message.getJobId());
+ // notify event listeners
+ feedPipeline.remove(feedId);
+ entity2Intake.remove(feedId);
+ notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+ : FeedLifecycleEvent.FEED_INTAKE_ENDED);
+ }
+
+ private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+ FeedConnectionId connectionId = cInfo.getConnectionId();
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ JobStatus status = info.getStatus();
+ boolean failure = status != null && status.equals(JobStatus.FAILURE);
+ FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
+ boolean retainSubsription =
+ cInfo.getState().equals(ActivityState.UNDER_RECOVERY) || (failure && fpa.continueOnHardwareFailure());
+
+ if (!retainSubsription) {
+ IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
+ feedJoint.removeReceiver(connectionId);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+ }
+ }
+
+ connectJobInfos.remove(connectionId);
+ jobs.remove(cInfo.getJobId().getId());
+ // notify event listeners
+ FeedLifecycleEvent event =
+ failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED;
+ notifyFeedEventSubscribers(event);
+ }
+
+ public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getStorageLocations();
+ }
+
+ public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getCollectLocations();
+ }
+
+ public List<String> getFeedIntakeLocations(EntityId feedId) {
+ return entity2Intake.get(feedId).getIntakeLocation();
+ }
+
+ public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getJobId();
+ }
+
+ public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
+ List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
+ ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
+ if (joints != null && !joints.isEmpty()) {
+ for (IFeedJoint joint : joints) {
+ if (joint.getFeedJointKey().equals(feedJointKey)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public Collection<IFeedJoint> getFeedIntakeJoints() {
+ List<IFeedJoint> intakeFeedPoints = new ArrayList<>();
+ for (FeedIntakeInfo info : entity2Intake.values()) {
+ intakeFeedPoints.add(info.getIntakeFeedJoint());
+ }
+ return intakeFeedPoints;
+ }
+
+ public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
+ List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId())
+ ? feedPipeline.get(feedPointKey.getFeedId()).second : null;
+ if (joints != null && !joints.isEmpty()) {
+ for (IFeedJoint joint : joints) {
+ if (joint.getFeedJointKey().equals(feedPointKey)) {
+ return joint;
+ }
+ }
+ }
+ return null;
+ }
+
+ public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+ IFeedJoint feedJoint = getFeedJoint(feedJointKey);
+ if (feedJoint != null) {
+ return feedJoint;
+ } else {
+ String jointKeyString = feedJointKey.getStringRep();
+ List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId())
+ ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
+ IFeedJoint candidateJoint = null;
+ if (jointsOnPipeline != null) {
+ for (IFeedJoint joint : jointsOnPipeline) {
+ if (jointKeyString.contains(joint.getFeedJointKey().getStringRep()) && (candidateJoint == null
+ || /*found feed point is a super set of the earlier find*/joint.getFeedJointKey()
+ .getStringRep().contains(candidateJoint.getFeedJointKey().getStringRep()))) {
+ candidateJoint = joint;
+ }
+ }
+ }
+ return candidateJoint;
+ }
+ }
+
+ public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId).getSpec();
+ }
+
+ public IFeedJoint getFeedPoint(EntityId sourceFeedId, IFeedJoint.FeedJointType type) {
+ List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second;
+ for (IFeedJoint joint : joints) {
+ if (joint.getType().equals(type)) {
+ return joint;
+ }
+ }
+ return null;
+ }
+
+ private void setLocations(FeedConnectJobInfo cInfo) {
+ JobSpecification jobSpec = cInfo.getSpec();
+
+ List<OperatorDescriptorId> collectOperatorIds = new ArrayList<>();
+ List<OperatorDescriptorId> computeOperatorIds = new ArrayList<>();
+ List<OperatorDescriptorId> storageOperatorIds = new ArrayList<>();
+
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ IOperatorDescriptor actualOp;
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+ } else {
+ actualOp = opDesc;
+ }
+
+ if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+ AlgebricksMetaOperatorDescriptor op = (AlgebricksMetaOperatorDescriptor) actualOp;
+ IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
+ boolean computeOp = false;
+ for (IPushRuntimeFactory rf : runtimeFactories) {
+ if (rf instanceof AssignRuntimeFactory) {
+ IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
+ IOperatorDescriptor sourceOp =
+ jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId()).getLeft().getLeft();
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ computeOp = true;
+ break;
+ }
+ }
+ }
+ if (computeOp) {
+ computeOperatorIds.add(entry.getKey());
+ }
+ } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ storageOperatorIds.add(entry.getKey());
+ } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
+ collectOperatorIds.add(entry.getKey());
+ }
+ }
+
+ try {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ List<String> collectLocations = new ArrayList<>();
+ for (OperatorDescriptorId collectOpId : collectOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ collectLocations.add(operatorLocations.get(i));
+ }
+ }
+
+ List<String> computeLocations = new ArrayList<>();
+ for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+ if (operatorLocations != null) {
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ computeLocations.add(operatorLocations.get(i));
+ }
+ } else {
+ computeLocations.clear();
+ computeLocations.addAll(collectLocations);
+ }
+ }
+
+ List<String> storageLocations = new ArrayList<>();
+ for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+ if (operatorLocations == null) {
+ continue;
+ }
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ storageLocations.add(operatorLocations.get(i));
+ }
+ }
+ cInfo.setCollectLocations(collectLocations);
+ cInfo.setComputeLocations(computeLocations);
+ cInfo.setStorageLocations(storageLocations);
+
+ } catch (Exception e) {
+ LOGGER.error("Error while setting feed active locations", e);
+ }
+
+ }
+
+ public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+ subscribers.add(subscriber);
+ }
+
+ public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+ subscribers.remove(subscriber);
+ }
+
+ public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+ IFeedLifecycleEventSubscriber eventSubscriber) {
+ boolean active = false;
+ FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+ if (cInfo != null) {
+ active = cInfo.getState().equals(ActivityState.ACTIVE);
+ }
+ if (active) {
+ registerFeedEventSubscriber(eventSubscriber);
+ }
+ return active;
+ }
+
+ public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+ return connectJobInfos.get(connectionId);
+ }
+
+ private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws ACIDException {
+ // set locations of feed sub-operations (intake, compute, store)
+ setLocations(cInfo);
+ Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+ // activate joints
+ List<IFeedJoint> joints = pair.second;
+ for (IFeedJoint joint : joints) {
+ if (joint.getProvider().equals(cInfo.getConnectionId())) {
+ joint.setState(State.ACTIVE);
+ if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
+ cInfo.setComputeFeedJoint(joint);
+ }
+ }
+ }
+ cInfo.setState(ActivityState.ACTIVE);
+ }
+
+ public synchronized boolean isConnectedToDataset(String datasetName) {
+ for (FeedConnectionId connection : connectJobInfos.keySet()) {
+ if (connection.getDatasetName().equals(datasetName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public FeedConnectionId[] getConnections() {
+ return connectJobInfos.keySet().toArray(new FeedConnectionId[connectJobInfos.size()]);
+ }
+
+ public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
+ return isFeedPointAvailable(feedJointKey);
+ }
+
+ @Override
+ public boolean isEntityActive() {
+ return !jobs.isEmpty();
+ }
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ public IFeedJoint getSourceFeedJoint() {
+ return sourceFeedJoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
deleted file mode 100644
index 3145d72..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data feed.
- */
-public class FeedId implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final String dataverse;
- private final String feedName;
-
- public FeedId(String dataverse, String feedName) {
- this.dataverse = dataverse;
- this.feedName = feedName;
- }
-
- public String getDataverse() {
- return dataverse;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof FeedId)) {
- return false;
- }
- if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
- return true;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return dataverse + "." + feedName;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
index edbaf7c..2905bb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.external.feed.management;
+import java.io.Serializable;
import java.util.List;
+import org.apache.asterix.active.EntityId;
import org.apache.commons.lang3.StringUtils;
/**
@@ -28,13 +30,14 @@ import org.apache.commons.lang3.StringUtils;
* constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
* routed to multiple paths.
*/
-public class FeedJointKey {
+public class FeedJointKey implements Serializable {
- private final FeedId primaryFeedId;
+ private static final long serialVersionUID = 1L;
+ private final EntityId primaryFeedId;
private final List<String> appliedFunctions;
private final String stringRep;
- public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
+ public FeedJointKey(EntityId feedId, List<String> appliedFunctions) {
this.primaryFeedId = feedId;
this.appliedFunctions = appliedFunctions;
StringBuilder builder = new StringBuilder();
@@ -44,7 +47,7 @@ public class FeedJointKey {
stringRep = builder.toString();
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return primaryFeedId;
}
@@ -68,8 +71,9 @@ public class FeedJointKey {
@Override
public boolean equals(Object o) {
- if (this == o)
+ if (this == o) {
return true;
+ }
if (o == null || !(o instanceof FeedJointKey)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
deleted file mode 100644
index 62f4c6c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedManager {
-
- private final Map<FeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
-
- private final IFeedConnectionManager feedConnectionManager;
-
- private final ConcurrentFramePool feedMemoryManager;
-
- private final AsterixFeedProperties asterixFeedProperties;
-
- private final String nodeId;
-
- private final int frameSize;
-
- public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize)
- throws AsterixException, HyracksDataException {
- this.nodeId = nodeId;
- this.feedConnectionManager = new FeedConnectionManager(nodeId);
- this.feedMemoryManager =
- new ConcurrentFramePool(nodeId, feedProperties.getMemoryComponentGlobalBudget(), frameSize);
- this.frameSize = frameSize;
- this.asterixFeedProperties = feedProperties;
- this.subscribableRuntimes = new ConcurrentHashMap<FeedRuntimeId, ISubscribableRuntime>();
- }
-
- public IFeedConnectionManager getFeedConnectionManager() {
- return feedConnectionManager;
- }
-
- public ConcurrentFramePool getFeedMemoryManager() {
- return feedMemoryManager;
- }
-
- public int getFrameSize() {
- return frameSize;
- }
-
- public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
- FeedRuntimeId sid = subscribableRuntime.getRuntimeId();
- if (!subscribableRuntimes.containsKey(sid)) {
- subscribableRuntimes.put(sid, subscribableRuntime);
- }
- }
-
- public void deregisterFeedSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
- subscribableRuntimes.remove(subscribableRuntimeId);
- }
-
- public ISubscribableRuntime getSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
- return subscribableRuntimes.get(subscribableRuntimeId);
- }
-
- @Override
- public String toString() {
- return "FeedManager " + "[" + nodeId + "]";
- }
-
- public AsterixFeedProperties getAsterixFeedProperties() {
- return asterixFeedProperties;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
deleted file mode 100644
index f7e98f7..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public class FeedRuntimeManager {
-
- private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
-
- private final FeedConnectionId connectionId;
- private final IFeedConnectionManager connectionManager;
- private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
-
- private final ExecutorService executorService;
-
- public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
- this.connectionId = connectionId;
- this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
- this.executorService = Executors.newCachedThreadPool();
- this.connectionManager = feedConnectionManager;
- }
-
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdownNow();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + connectionId);
- }
- }
- }
-
- public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
- return feedRuntimes.get(runtimeId);
- }
-
- public void registerFeedRuntime(FeedRuntimeId runtimeId, FeedRuntime feedRuntime) {
- feedRuntimes.put(runtimeId, feedRuntime);
- }
-
- public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
- feedRuntimes.remove(runtimeId);
- if (feedRuntimes.isEmpty()) {
- connectionManager.deregisterFeed(connectionId);
- }
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public Set<FeedRuntimeId> getFeedRuntimes() {
- return feedRuntimes.keySet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
index b0f7624..06aafdd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
@@ -18,22 +18,24 @@
*/
package org.apache.asterix.external.feed.message;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.json.JSONException;
+import org.json.JSONObject;
/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
+ * @deprecated A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ * Instead, use IMessageBroker messages
*/
+@Deprecated
public class EndFeedMessage extends FeedMessage {
private static final long serialVersionUID = 1L;
- private final FeedId sourceFeedId;
+ private final EntityId sourceFeedId;
private final FeedConnectionId connectionId;
@@ -48,7 +50,7 @@ public class EndFeedMessage extends FeedMessage {
DISCONTINUE_SOURCE
}
- public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, FeedId sourceFeedId,
+ public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, EntityId sourceFeedId,
boolean completeDisconnection, EndMessageType endMessageType) {
super(MessageType.END);
this.connectionId = connectionId;
@@ -67,7 +69,7 @@ public class EndFeedMessage extends FeedMessage {
return sourceRuntimeType;
}
- public FeedId getSourceFeedId() {
+ public EntityId getSourceFeedId() {
return sourceFeedId;
}
@@ -84,7 +86,7 @@ public class EndFeedMessage extends FeedMessage {
JSONObject obj = new JSONObject();
obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getEntityName());
obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
return obj;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
index f2b354b..4f57fb5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.external.feed.message;
-import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.active.IActiveMessage;
import org.apache.hyracks.api.dataflow.value.JSONSerializable;
/**
* A control message that can be sent to the runtime instance of a
* feed's adapter.
*/
-public abstract class FeedMessage implements IFeedMessage, JSONSerializable {
+public abstract class FeedMessage implements IActiveMessage, JSONSerializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
deleted file mode 100644
index 49b23ed..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedPartitionStartMessage extends AbstractApplicationMessage {
-
- private static final long serialVersionUID = 1L;
- private final FeedId feedId;
- private final JobId jobId;
-
- public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
- this.feedId = feedId;
- this.jobId = jobId;
- }
-
- @Override
- public ApplicationMessageType getMessageType() {
- return ApplicationMessageType.FEED_PROVIDER_READY;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index b604db5..f2f0747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -22,9 +22,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
import org.apache.log4j.Logger;
/**
@@ -34,7 +34,7 @@ public class AdapterRuntimeManager {
private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
- private final FeedId feedId; // (dataverse-feed)
+ private final EntityId feedId; // (dataverse-feed)
private final FeedAdapter feedAdapter; // The adapter
@@ -49,7 +49,7 @@ public class AdapterRuntimeManager {
private volatile boolean done = false;
private volatile boolean failed = false;
- public AdapterRuntimeManager(FeedId feedId, FeedAdapter feedAdapter, DistributeFeedFrameWriter writer,
+ public AdapterRuntimeManager(EntityId feedId, FeedAdapter feedAdapter, DistributeFeedFrameWriter writer,
int partition) {
this.feedId = feedId;
this.feedAdapter = feedAdapter;
@@ -86,7 +86,7 @@ public class AdapterRuntimeManager {
}
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return feedId;
}