You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 13:34:04 UTC
[2/7] asterixdb git commit: Refactor General Active Classes
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 6b1c5b8..294642e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.feed.runtime;
import java.util.Map;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
@@ -32,16 +34,17 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
* intake job. For a secondary feed, tuples are collected from the intake/compute
* runtime associated with the source feed.
*/
-public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+public class CollectionRuntime extends ActiveRuntime implements ISubscriberRuntime {
- private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
- private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data
- private final Map<String, String> feedPolicy; // Policy associated with the feed
- private final FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor
+ private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset]
+ private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data
+ private final Map<String, String> feedPolicy; // Policy associated with the feed
+ private final FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor
private final IHyracksTaskContext ctx;
- public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ISubscribableRuntime sourceRuntime,
- Map<String, String> feedPolicy, IHyracksTaskContext ctx, FeedFrameCollector frameCollector) {
+ public CollectionRuntime(FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
+ ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy, IHyracksTaskContext ctx,
+ FeedFrameCollector frameCollector) {
super(runtimeId);
this.connectionId = connectionId;
this.sourceRuntime = sourceRuntime;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
deleted file mode 100644
index 8c86d86..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime;
-
-public class FeedRuntime implements IFeedRuntime {
-
- /** A unique identifier for the runtime **/
- protected final FeedRuntimeId runtimeId;
-
- public FeedRuntime(FeedRuntimeId runtimeId) {
- this.runtimeId = runtimeId;;
- }
-
- @Override
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- @Override
- public String toString() {
- return runtimeId.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
deleted file mode 100644
index 18d4cff..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.io.Serializable;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedId;
-
-public class FeedRuntimeId implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public static final String DEFAULT_TARGET_ID = "N/A";
-
- private final FeedId feedId;
- private final FeedRuntimeType runtimeType;
- private final int partition;
- private final String targetId;
- private final int hashCode;
-
- public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
- this.feedId = feedId;
- this.runtimeType = runtimeType;
- this.partition = partition;
- this.targetId = targetId;
- this.hashCode = toString().hashCode();
- }
-
- @Override
- public String toString() {
- return runtimeType + "(" + feedId + ")" + "[" + partition + "]" + "==>" + "{" + targetId + "}";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof FeedRuntimeId)) {
- return false;
- }
- FeedRuntimeId other = (FeedRuntimeId) o;
- return (other.feedId.equals(feedId) && other.getFeedRuntimeType().equals(runtimeType)
- && other.getTargetId().equals(targetId) && other.getPartition() == partition);
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- public FeedRuntimeType getFeedRuntimeType() {
- return runtimeType;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public FeedRuntimeType getRuntimeType() {
- return runtimeType;
- }
-
- public String getTargetId() {
- return targetId;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 6cdc2af..8ee3e2b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -20,10 +20,11 @@ package org.apache.asterix.external.feed.runtime;
import java.util.logging.Level;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-import org.apache.asterix.external.feed.management.FeedId;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,7 +35,7 @@ public class IngestionRuntime extends SubscribableRuntime {
private final IHyracksTaskContext ctx;
private int numSubscribers = 0;
- public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+ public IngestionRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
AdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
super(feedId, runtimeId, feedWriter);
this.adapterRuntimeManager = adaptorRuntimeManager;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
index e060e27..423e599 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
@@ -22,26 +22,28 @@ import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
-public abstract class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
+public abstract class SubscribableRuntime extends ActiveRuntime implements ISubscribableRuntime {
protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
- protected final FeedId feedId;
+ protected final EntityId feedId;
protected final List<ISubscriberRuntime> subscribers;
protected final DistributeFeedFrameWriter dWriter;
- public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
+ public SubscribableRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) {
super(runtimeId);
this.feedId = feedId;
this.dWriter = dWriter;
this.subscribers = new ArrayList<ISubscriberRuntime>();
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return feedId;
}
@@ -49,8 +51,4 @@ public abstract class SubscribableRuntime extends FeedRuntime implements ISubscr
public String toString() {
return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
}
-
- public FeedRuntimeType getFeedRuntimeType() {
- return runtimeId.getFeedRuntimeType();
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index b69a7b3..82cdddf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -21,13 +21,18 @@ package org.apache.asterix.external.feed.watch;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.api.IFeedJoint;
import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedUtils.JobType;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-public class FeedConnectJobInfo extends FeedJobInfo {
+public class FeedConnectJobInfo extends ActiveJob {
+ private static final long serialVersionUID = 1L;
private final FeedConnectionId connectionId;
private final Map<String, String> feedPolicy;
private final IFeedJoint sourceFeedJoint;
@@ -38,10 +43,10 @@ public class FeedConnectJobInfo extends FeedJobInfo {
private List<String> storageLocations;
private int partitionStarts = 0;
- public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+ public FeedConnectJobInfo(EntityId entityId, JobId jobId, ActivityState state, FeedConnectionId connectionId,
IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
Map<String, String> feedPolicy) {
- super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+ super(entityId, jobId, state, JobType.FEED_CONNECT, spec);
this.connectionId = connectionId;
this.sourceFeedJoint = sourceFeedJoint;
this.computeFeedJoint = computeFeedJoint;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
index 3b11811..4114e82 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
@@ -20,27 +20,31 @@ package org.apache.asterix.external.feed.watch;
import java.util.List;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedUtils.JobType;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-public class FeedIntakeInfo extends FeedJobInfo {
+public class FeedIntakeInfo extends ActiveJob {
- private final FeedId feedId;
+ private static final long serialVersionUID = 1L;
+ private final EntityId feedId;
private final IFeedJoint intakeFeedJoint;
private final JobSpecification spec;
private List<String> intakeLocation;
- public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+ public FeedIntakeInfo(JobId jobId, ActivityState state, EntityId feedId, IFeedJoint intakeFeedJoint,
JobSpecification spec) {
- super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+ super(feedId, jobId, state, JobType.INTAKE, spec);
this.feedId = feedId;
this.intakeFeedJoint = intakeFeedJoint;
this.spec = spec;
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return feedId;
}
@@ -48,6 +52,7 @@ public class FeedIntakeInfo extends FeedJobInfo {
return intakeFeedJoint;
}
+ @Override
public JobSpecification getSpec() {
return spec;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
deleted file mode 100644
index 92e00cb..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedJobInfo {
-
- private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
-
- public enum JobType {
- INTAKE,
- FEED_CONNECT
- }
-
- public enum FeedJobState {
- CREATED,
- ACTIVE,
- UNDER_RECOVERY,
- ENDED
- }
-
- protected final JobId jobId;
- protected final JobType jobType;
- protected FeedJobState state;
- protected JobSpecification spec;
-
- public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
- this.jobId = jobId;
- this.state = state;
- this.jobType = jobType;
- this.spec = spec;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public FeedJobState getState() {
- return state;
- }
-
- public void setState(FeedJobState state) {
- this.state = state;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(this + " is in " + state + " state.");
- }
- }
-
- public JobType getJobType() {
- return jobType;
- }
-
- public JobSpecification getSpec() {
- return spec;
- }
-
- public void setSpec(JobSpecification spec) {
- this.spec = spec;
- }
-
- public String toString() {
- return jobId + " [" + jobType + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 36098ee..84c2cb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -20,13 +20,13 @@ package org.apache.asterix.external.operators;
import java.util.Map;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -56,13 +56,13 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
private final Map<String, String> feedPolicyProperties;
/** The source feed from which the feed derives its data from. **/
- private final FeedId sourceFeedId;
+ private final EntityId sourceFeedId;
/** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/
private final FeedRuntimeType subscriptionLocation;
- public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
- ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+ public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+ EntityId sourceFeedId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
FeedRuntimeType subscriptionLocation) {
super(spec, 0, 1);
this.recordDescriptors[0] = rDesc;
@@ -77,13 +77,11 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+ ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject()).getFeedManager();
- FeedRuntimeId sourceRuntimeId =
- new FeedRuntimeId(sourceFeedId, subscriptionLocation, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
- ISubscribableRuntime sourceRuntime = feedManager.getSubscribableRuntime(sourceRuntimeId);
- return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
- nPartitions, sourceRuntime);
+ ActiveRuntimeId sourceRuntimeId = new ActiveRuntimeId(sourceFeedId, subscriptionLocation.toString(), partition);
+ ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getSubscribableRuntime(sourceRuntimeId);
+ return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition, sourceRuntime);
}
public FeedConnectionId getFeedConnectionId() {
@@ -102,7 +100,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
return recordDescriptors[0];
}
- public FeedId getSourceFeedId() {
+ public EntityId getSourceFeedId() {
return sourceFeedId;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index aeea6ba..231fe99 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -20,19 +20,18 @@ package org.apache.asterix.external.operators;
import java.util.Map;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -47,46 +46,46 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
private final FeedConnectionId connectionId;
private final Map<String, String> feedPolicy;
private final FeedPolicyAccessor policyAccessor;
- private final FeedManager feedManager;
+ private final ActiveManager feedManager;
private final ISubscribableRuntime sourceRuntime;
private final IHyracksTaskContext ctx;
private CollectionRuntime collectRuntime;
- public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
- FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
- ISubscribableRuntime sourceRuntime) {
+ public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicy, int partition, ISubscribableRuntime sourceRuntime) {
this.ctx = ctx;
this.partition = partition;
this.connectionId = feedConnectionId;
this.sourceRuntime = sourceRuntime;
this.feedPolicy = feedPolicy;
this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
- this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
}
@Override
public void initialize() throws HyracksDataException {
try {
- FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT, partition,
- FeedRuntimeId.DEFAULT_TARGET_ID);
+ ActiveRuntimeId runtimeId =
+ new ActiveRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT.toString(), partition);
// Does this collector have a handler?
FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
if (policyAccessor.bufferingEnabled()) {
writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
- feedManager.getFeedMemoryManager());
+ feedManager.getFramePool());
} else {
writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
}
collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
new FeedFrameCollector(policyAccessor, writer, connectionId));
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ feedManager.getActiveRuntimeRegistry().registerRuntime(collectRuntime);
sourceRuntime.subscribe(collectRuntime);
// Notify CC that Collection started
ctx.sendApplicationMessageToCC(
- new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()), null);
+ new ActivePartitionMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId(), null),
+ null);
collectRuntime.waitTillCollectionOver();
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+ feedManager.getActiveRuntimeRegistry().deregisterRuntime(collectRuntime.getRuntimeId());
} catch (Exception e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 69aa59a..f4ea60f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -21,11 +21,12 @@ package org.apache.asterix.external.operators;
import java.util.Map;
import java.util.logging.Logger;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -47,7 +48,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
/** The unique identifier of the feed that is being ingested. **/
- private final FeedId feedId;
+ private final EntityId feedId;
private final FeedPolicyAccessor policyAccessor;
@@ -71,7 +72,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
+ primaryFeed.getFeedName());
this.adaptorFactory = adapterFactory;
this.adapterOutputType = adapterOutputType;
this.policyAccessor = policyAccessor;
@@ -82,7 +84,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor,
RecordDescriptor rDesc) {
super(spec, 0, 1);
- this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+ this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(),
+ primaryFeed.getFeedName());
this.adaptorFactoryClassName = adapterFactoryClassName;
this.adaptorLibraryName = adapterLibraryName;
this.adaptorConfiguration = primaryFeed.getAdapterConfiguration();
@@ -124,7 +127,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
return adapterFactory;
}
- public FeedId getFeedId() {
+ public EntityId getFeedId() {
return feedId;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 04ef016..ffa451b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,22 +18,21 @@
*/
package org.apache.asterix.external.operators;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActivePartitionMessage;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
@@ -43,13 +42,13 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
*/
public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
- private final FeedId feedId;
+ private final EntityId feedId;
private final int partition;
private final IHyracksTaskContext ctx;
private final IAdapterFactory adapterFactory;
private final FeedIntakeOperatorDescriptor opDesc;
- public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
this.opDesc = feedIntakeOperatorDescriptor;
@@ -62,7 +61,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
@Override
public void initialize() throws HyracksDataException {
- FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+ ActiveManager feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject()).getFeedManager();
AdapterRuntimeManager adapterRuntimeManager = null;
DistributeFeedFrameWriter frameDistributor = null;
@@ -73,17 +72,15 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
// create the adapter
FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
// create the distributor
- frameDistributor = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
- new FrameTupleAccessor(recordDesc));
+ frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
// create adapter runtime manager
adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
// create and register the runtime
- FeedRuntimeId runtimeId =
- new FeedRuntimeId(feedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+ ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);
- feedManager.registerFeedSubscribableRuntime(ingestionRuntime);
+ feedManager.registerRuntime(ingestionRuntime);
// Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
- ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
+ ctx.sendApplicationMessageToCC(new ActivePartitionMessage(feedId, ctx.getJobletContext().getJobId(), null),
null);
// open the distributor
open = true;
@@ -95,7 +92,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
}
}
// The ingestion is over. we need to remove the runtime from the manager
- feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
+ feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
// If there was a failure, we need to throw an exception
if (adapterRuntimeManager.isFailed()) {
throw new HyracksDataException("Unable to ingest data");
@@ -108,7 +105,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
*/
if (ingestionRuntime != null) {
ingestionRuntime.terminate();
- feedManager.deregisterFeedSubscribableRuntime(ingestionRuntime.getRuntimeId());
+ feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId());
}
throw new HyracksDataException(ie);
} finally {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
index 219110f..61451b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.operators;
-import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.active.IActiveMessage;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -28,17 +28,20 @@ import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
/**
- * Sends a control message to the registered message queue for feed specified by its feedId.
+ * @deprecated
+ * Sends a control message to the registered message queue for feed specified by its feedId.
+ * For messaging, use IMessageBroker interfaces
*/
+@Deprecated
public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final FeedConnectionId connectionId;
- private final IFeedMessage feedMessage;
+ private final IActiveMessage feedMessage;
public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
- IFeedMessage feedMessage) {
+ IActiveMessage feedMessage) {
super(spec, 0, 1);
this.connectionId = connectionId;
this.feedMessage = feedMessage;
@@ -47,7 +50,7 @@ public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperato
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
+ return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
index 82bf1da..5f92327 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -21,46 +21,49 @@ package org.apache.asterix.external.operators;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveMessage;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.management.FeedManager;
import org.apache.asterix.external.feed.message.EndFeedMessage;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
- * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
- * a feed message to the local feed manager on the host node controller.
+ * @deprecated
+ * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ * a feed message to the local feed manager on the host node controller.
+ * For messages, use IMessageBroker interfaces
* @see FeedMessageOperatorDescriptor
* IFeedMessage
* IFeedManager
*/
+@Deprecated
public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
private final FeedConnectionId connectionId;
- private final IFeedMessage message;
- private final FeedManager feedManager;
+ private final IActiveMessage message;
+ private final ActiveManager feedManager;
private final int partition;
public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
- IFeedMessage feedMessage, int partition, int nPartitions) {
+ IActiveMessage feedMessage, int partition) {
this.connectionId = connectionId;
this.message = feedMessage;
this.partition = partition;
IAsterixAppRuntimeContext runtimeCtx =
(IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
- this.feedManager = (FeedManager) runtimeCtx.getFeedManager();
+ this.feedManager = (ActiveManager) runtimeCtx.getFeedManager();
}
@Override
@@ -77,6 +80,8 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
case DISCONTINUE_SOURCE:
handleDiscontinueFeedTypeMessage(endFeedMessage);
break;
+ default:
+ break;
}
break;
default:
@@ -90,10 +95,11 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
}
private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
- FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
- FeedRuntimeId subscribableRuntimeId =
- new FeedRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
- ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(subscribableRuntimeId);
+ EntityId sourceFeedId = endFeedMessage.getSourceFeedId();
+ ActiveRuntimeId subscribableRuntimeId =
+ new ActiveRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE.toString(), partition);
+ ISubscribableRuntime feedRuntime =
+ (ISubscribableRuntime) feedManager.getSubscribableRuntime(subscribableRuntimeId);
AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
adapterRuntimeManager.stop();
if (LOGGER.isLoggable(Level.INFO)) {
@@ -105,12 +111,12 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
}
- FeedRuntimeId runtimeId = null;
+ ActiveRuntimeId runtimeId;
FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
if (endFeedMessage.isCompleteDisconnection()) {
// subscribableRuntimeType represents the location at which the feed connection receives
// data
- FeedRuntimeType runtimeType = null;
+ FeedRuntimeType runtimeType;
switch (subscribableRuntimeType) {
case INTAKE:
runtimeType = FeedRuntimeType.COLLECT;
@@ -122,10 +128,9 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
}
- runtimeId = new FeedRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType, partition,
- FeedRuntimeId.DEFAULT_TARGET_ID);
+ runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType.toString(), partition);
CollectionRuntime feedRuntime =
- (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+ (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
if (feedRuntime != null) {
feedRuntime.getSourceRuntime().unsubscribe(feedRuntime);
}
@@ -142,11 +147,14 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType);
case COMPUTE:
// feed could be primary or secondary, doesn't matter
- FeedRuntimeId feedSubscribableRuntimeId = new FeedRuntimeId(connectionId.getFeedId(),
- FeedRuntimeType.COMPUTE, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
- ISubscribableRuntime feedRuntime = feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
- CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
- .getFeedRuntime(connectionId, runtimeId);
+ ActiveRuntimeId feedSubscribableRuntimeId = new ActiveRuntimeId(connectionId.getFeedId(),
+ FeedRuntimeType.COMPUTE.toString(), partition);
+ ISubscribableRuntime feedRuntime =
+ (ISubscribableRuntime) feedManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+ runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(),
+ FeedRuntimeType.COMPUTE_COLLECT.toString(), partition);
+ CollectionRuntime feedCollectionRuntime =
+ (CollectionRuntime) feedManager.getActiveRuntimeRegistry().getRuntime(runtimeId);
feedRuntime.unsubscribe(feedCollectionRuntime);
break;
default:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 8d8bc28..54e17ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -23,17 +23,17 @@ import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedManager;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
@@ -63,7 +63,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
* The Feed Runtime instance associated with the operator. Feed Runtime
* captures the state of the operator while the feed is active.
*/
- private FeedRuntime feedRuntime;
+ private ActiveRuntime feedRuntime;
/**
* A unique identifier for the feed instance. A feed instance represents
@@ -78,7 +78,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
private int partition;
/** The (singleton) instance of IFeedManager **/
- private FeedManager feedManager;
+ private ActiveManager feedManager;
private FrameTupleAccessor fta;
@@ -109,7 +109,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
this.partition = partition;
this.connectionId = feedConnectionId;
- this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
this.message = new VSizeFrame(ctx);
ctx.setSharedObject(message);
@@ -119,8 +119,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
@Override
public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId =
- new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+ ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString(), partition);
try {
initializeNewFeedRuntime(runtimeId);
opened = true;
@@ -131,18 +130,18 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
}
}
- private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exception {
fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
FeedPolicyAccessor fpa = policyEnforcer.getFeedPolicyAccessor();
coreOperator.setOutputFrameWriter(0, writer, recordDesc);
if (fpa.bufferingEnabled()) {
writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator, fpa, fta,
- feedManager.getFeedMemoryManager());
+ feedManager.getFramePool());
} else {
writer = new SyncFeedRuntimeInputHandler(ctx, coreOperator, fta);
}
- feedRuntime = new FeedRuntime(runtimeId);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+ feedRuntime = new ActiveRuntime(runtimeId);
+ feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
}
@Override
@@ -173,7 +172,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
}
private void deregister() {
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+ feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index c7dd3d2..908601d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -20,8 +20,8 @@ package org.apache.asterix.external.operators;
import java.util.Map;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 47df39e..6f679f7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -23,17 +23,17 @@ import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntime;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedManager;
import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -61,7 +61,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
* The Feed Runtime instance associated with the operator. Feed Runtime
* captures the state of the operator while the feed is active.
*/
- private FeedRuntime feedRuntime;
+ private ActiveRuntime feedRuntime;
/**
* A unique identifier for the feed instance. A feed instance represents
@@ -79,7 +79,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
/** The (singleton) instance of IFeedManager **/
- private final FeedManager feedManager;
+ private final ActiveManager feedManager;
private FrameTupleAccessor fta;
@@ -103,7 +103,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
this.partition = partition;
this.connectionId = feedConnectionId;
- this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
this.targetId = targetId;
this.message = new VSizeFrame(ctx);
@@ -114,7 +114,8 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
@Override
public void open() throws HyracksDataException {
- FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
+ ActiveRuntimeId runtimeId =
+ new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
try {
initializeNewFeedRuntime(runtimeId);
insertOperator.open();
@@ -124,7 +125,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
}
- private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+ private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exception {
fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0));
insertOperator.setOutputFrameWriter(0, writer, recordDesc);
if (insertOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
@@ -138,7 +139,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
if (policyEnforcer.getFeedPolicyAccessor().bufferingEnabled()) {
writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, insertOperator,
- policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFeedMemoryManager());
+ policyEnforcer.getFeedPolicyAccessor(), fta, feedManager.getFramePool());
} else {
writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
}
@@ -146,9 +147,10 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
private void setupBasicRuntime(IFrameWriter frameWriter) throws Exception {
- FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), runtimeType, partition, targetId);
- feedRuntime = new FeedRuntime(runtimeId);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+ ActiveRuntimeId runtimeId =
+ new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition);
+ feedRuntime = new ActiveRuntime(runtimeId);
+ feedManager.getActiveRuntimeRegistry().registerRuntime(feedRuntime);
}
@Override
@@ -177,7 +179,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
}
private void deregister() {
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+ feedManager.getActiveRuntimeRegistry().deregisterRuntime(feedRuntime.getRuntimeId());
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 8228c39..6b7eb31 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -42,6 +42,29 @@ import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.util.IntSerDeUtils;
public class FeedUtils {
+
+ public enum JobType {
+ INTAKE,
+ FEED_CONNECT
+ }
+
+ public enum FeedRuntimeType {
+ INTAKE,
+ COLLECT,
+ COMPUTE_COLLECT,
+ COMPUTE,
+ STORE,
+ OTHER,
+ ETS,
+ JOIN
+ }
+
+ public enum Mode {
+ PROCESS, // There is memory
+ SPILL, // Memory budget has been consumed. Now we're writing to disk
+ DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding
+ }
+
private static String prepareDataverseFeedName(String dataverseName, String feedName) {
return dataverseName + File.separator + feedName;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
index 49042b8..0f6a2ea 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -24,9 +24,9 @@ import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.FrameAction;
import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.dataflow.FrameAction;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.junit.Assert;
import org.mockito.Mockito;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index bc1c328..e643206 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -72,10 +72,9 @@ public class InputHandlerTest extends TestCase {
private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
- FeedId feedId = new FeedId(DATAVERSE, FEED);
+ EntityId feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, DATAVERSE, FEED);
FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
- FeedRuntimeId runtimeId =
- new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID);
+ ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 092bf69..14f229a 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -24,11 +24,11 @@ import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedActivity;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -52,7 +52,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
* Represents the AQL statement for subscribing to a feed.
* This AQL statement is private and may not be used by the end-user.
*/
-public class SubscribeFeedStatement implements Statement {
+public class SubscribeFeedStatement extends Statement {
private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
private final FeedConnectionRequest connectionRequest;
@@ -71,10 +71,10 @@ public class SubscribeFeedStatement implements Statement {
public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
this.query = new Query();
- FeedId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
- Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
- connectionRequest.getReceivingFeedId().getDataverse(),
- connectionRequest.getReceivingFeedId().getFeedName());
+ EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+ Feed subscriberFeed =
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
+ connectionRequest.getReceivingFeedId().getEntityName());
if (subscriberFeed == null) {
throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
}
@@ -100,8 +100,9 @@ public class SubscribeFeedStatement implements Statement {
builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
- + sourceFeedId.getFeedName() + "'" + "," + "'" + connectionRequest.getReceivingFeedId().getFeedName()
- + "'" + "," + "'" + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+ + sourceFeedId.getEntityName() + "'" + "," + "'"
+ + connectionRequest.getReceivingFeedId().getEntityName() + "'" + "," + "'"
+ + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+ connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
List<String> functionsToApply = connectionRequest.getFunctionsToApply();
@@ -156,8 +157,8 @@ public class SubscribeFeedStatement implements Statement {
}
@Override
- public Kind getKind() {
- return Kind.SUBSCRIBE_FEED;
+ public byte getKind() {
+ return Statement.SUBSCRIBE_FEED;
}
public String getPolicy() {
@@ -179,8 +180,8 @@ public class SubscribeFeedStatement implements Statement {
private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
String outputType = null;
- FeedId feedId = connectionRequest.getReceivingFeedId();
- Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName());
+ EntityId feedId = connectionRequest.getReceivingFeedId();
+ Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName());
FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
try {
switch (feed.getFeedType()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
index 2fc4b14..f30d4a6 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java
@@ -70,7 +70,7 @@ public abstract class RangeMapBuilder {
}
// Translate the query into a Range Map
- if (hintStatements.get(0).getKind() != Statement.Kind.QUERY) {
+ if (hintStatements.get(0).getKind() != Statement.QUERY) {
throw new AsterixException("Not a proper query for the range hint.");
}
Query q = (Query) hintStatements.get(0);
@@ -151,8 +151,8 @@ public abstract class RangeMapBuilder {
int fieldIndex = 0;
int fieldType = rangeMap.getTag(0, 0);
AqlBinaryComparatorFactoryProvider comparatorFactory = AqlBinaryComparatorFactoryProvider.INSTANCE;
- IBinaryComparatorFactory bcf = comparatorFactory
- .getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending);
+ IBinaryComparatorFactory bcf =
+ comparatorFactory.getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending);
IBinaryComparator comparator = bcf.createBinaryComparator();
int c = 0;
for (int split = 1; split < rangeMap.getSplitCount(); ++split) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 3184d1e..6f0f1f1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -18,45 +18,52 @@
*/
package org.apache.asterix.lang.common.base;
-public interface Statement extends ILangExpression {
- public enum Kind {
- DATASET_DECL,
- DATAVERSE_DECL,
- DATAVERSE_DROP,
- DATASET_DROP,
- DELETE,
- INSERT,
- UPSERT,
- UPDATE,
- DML_CMD_LIST,
- FUNCTION_DECL,
- LOAD,
- NODEGROUP_DECL,
- NODEGROUP_DROP,
- QUERY,
- SET,
- TYPE_DECL,
- TYPE_DROP,
- WRITE,
- CREATE_INDEX,
- INDEX_DECL,
- CREATE_DATAVERSE,
- INDEX_DROP,
- CREATE_PRIMARY_FEED,
- CREATE_SECONDARY_FEED,
- DROP_FEED,
- CONNECT_FEED,
- DISCONNECT_FEED,
- SUBSCRIBE_FEED,
- CREATE_FEED_POLICY,
- DROP_FEED_POLICY,
- CREATE_FUNCTION,
- FUNCTION_DROP,
- COMPACT,
- EXTERNAL_DATASET_REFRESH,
- RUN
- }
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
- public abstract Kind getKind();
+public abstract class Statement implements ILangExpression {
+ public static final byte DATASET_DECL = 0x00;
+ public static final byte DATAVERSE_DECL = 0x01;
+ public static final byte DATAVERSE_DROP = 0x02;
+ public static final byte DATASET_DROP = 0x03;
+ public static final byte DELETE = 0x04;
+ public static final byte INSERT = 0x05;
+ public static final byte UPSERT = 0x06;
+ public static final byte UPDATE = 0x07;
+ public static final byte DML_CMD_LIST = 0x08;
+ public static final byte FUNCTION_DECL = 0x09;
+ public static final byte LOAD = 0x0a;
+ public static final byte NODEGROUP_DECL = 0x0b;
+ public static final byte NODEGROUP_DROP = 0x0c;
+ public static final byte QUERY = 0x0d;
+ public static final byte SET = 0x0e;
+ public static final byte TYPE_DECL = 0x0f;
+ public static final byte TYPE_DROP = 0x10;
+ public static final byte WRITE = 0x11;
+ public static final byte CREATE_INDEX = 0x12;
+ public static final byte INDEX_DECL = 0x13;
+ public static final byte CREATE_DATAVERSE = 0x14;
+ public static final byte INDEX_DROP = 0x15;
+ public static final byte CREATE_PRIMARY_FEED = 0x16;
+ public static final byte CREATE_SECONDARY_FEED = 0x17;
+ public static final byte DROP_FEED = 0x18;
+ public static final byte CONNECT_FEED = 0x19;
+ public static final byte DISCONNECT_FEED = 0x1a;
+ public static final byte SUBSCRIBE_FEED = 0x1b;
+ public static final byte CREATE_FEED_POLICY = 0x1c;
+ public static final byte DROP_FEED_POLICY = 0x1d;
+ public static final byte CREATE_FUNCTION = 0x1e;
+ public static final byte FUNCTION_DROP = 0x1f;
+ public static final byte COMPACT = 0x20;
+ public static final byte EXTERNAL_DATASET_REFRESH = 0x21;
+ public static final byte RUN = 0x22;
+ public static final List<Byte> VALUES = Collections.unmodifiableList(
+ Arrays.asList(DATASET_DECL, DATAVERSE_DECL, DATAVERSE_DROP, DATASET_DROP, DELETE, INSERT, UPSERT, UPDATE,
+ DML_CMD_LIST, FUNCTION_DECL, LOAD, NODEGROUP_DECL, NODEGROUP_DROP, QUERY, SET, TYPE_DECL, TYPE_DROP,
+ WRITE, CREATE_INDEX, INDEX_DECL, CREATE_DATAVERSE, INDEX_DROP, CREATE_PRIMARY_FEED,
+ CREATE_SECONDARY_FEED, DROP_FEED, CONNECT_FEED, DISCONNECT_FEED, SUBSCRIBE_FEED, CREATE_FEED_POLICY,
+ DROP_FEED_POLICY, CREATE_FUNCTION, FUNCTION_DROP, COMPACT, EXTERNAL_DATASET_REFRESH, RUN));
+ public abstract byte getKind();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
index 93d151c..531957f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CompactStatement.java
@@ -23,7 +23,7 @@ import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-public class CompactStatement implements Statement {
+public class CompactStatement extends Statement {
private final Identifier dataverseName;
private final Identifier datasetName;
@@ -34,8 +34,8 @@ public class CompactStatement implements Statement {
}
@Override
- public Kind getKind() {
- return Kind.COMPACT;
+ public byte getKind() {
+ return Statement.COMPACT;
}
public Identifier getDataverseName() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index b4208b9..33e3340 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -25,7 +25,7 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.hyracks.algebricks.common.utils.Pair;
-public class ConnectFeedStatement implements Statement {
+public class ConnectFeedStatement extends Statement {
private final Identifier dataverseName;
private final Identifier datasetName;
@@ -77,8 +77,8 @@ public class ConnectFeedStatement implements Statement {
}
@Override
- public Kind getKind() {
- return Kind.CONNECT_FEED;
+ public byte getKind() {
+ return Statement.CONNECT_FEED;
}
public String getPolicy() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 1eb8372..820ae5f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -23,7 +23,7 @@ import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-public class CreateDataverseStatement implements Statement {
+public class CreateDataverseStatement extends Statement {
private Identifier dataverseName;
private String format;
@@ -31,10 +31,11 @@ public class CreateDataverseStatement implements Statement {
public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
this.dataverseName = dataverseName;
- if (format == null)
+ if (format == null) {
this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
- else
+ } else {
this.format = format;
+ }
this.ifNotExists = ifNotExists;
}
@@ -51,8 +52,8 @@ public class CreateDataverseStatement implements Statement {
}
@Override
- public Kind getKind() {
- return Kind.CREATE_DATAVERSE;
+ public byte getKind() {
+ return Statement.CREATE_DATAVERSE;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
index bd3192c..e972cad 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedPolicyStatement.java
@@ -24,7 +24,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-public class CreateFeedPolicyStatement implements Statement {
+public class CreateFeedPolicyStatement extends Statement {
private final String policyName;
private final String sourcePolicyName;
@@ -58,8 +58,8 @@ public class CreateFeedPolicyStatement implements Statement {
}
@Override
- public Kind getKind() {
- return Statement.Kind.CREATE_FEED_POLICY;
+ public byte getKind() {
+ return Statement.CREATE_FEED_POLICY;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
index 53d05d2..9635836 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java
@@ -25,7 +25,7 @@ import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.hyracks.algebricks.common.utils.Pair;
-public abstract class CreateFeedStatement implements Statement {
+public abstract class CreateFeedStatement extends Statement {
private final Pair<Identifier, Identifier> qName;
private final FunctionSignature appliedFunction;
@@ -55,9 +55,6 @@ public abstract class CreateFeedStatement implements Statement {
}
@Override
- public abstract Kind getKind();
-
- @Override
public abstract <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException;
}