You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 13:34:09 UTC
[7/7] asterixdb git commit: Refactor General Active Classes
Refactor General Active Classes
This change is the first of a series of changes aiming to refactor
feed related classes into generic active and feed specific.
In this change, we introduce asterix-active which will be one level
below asterix-external-data. The ultimate goal is to have a very
flexible framewrok for long running activities within asterix.
Change-Id: I0a8f33ee5b45c5e090b08c24a102e369aae43c04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/977
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/973a0d34
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/973a0d34
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/973a0d34
Branch: refs/heads/master
Commit: 973a0d34e36de2fb950f4979843b3bef9f1f992d
Parents: 3044e2e
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Fri Jul 22 14:38:56 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Jul 22 06:32:59 2016 -0700
----------------------------------------------------------------------
asterixdb/asterix-active/pom.xml | 45 ++
.../org/apache/asterix/active/ActiveEvent.java | 68 ++
.../org/apache/asterix/active/ActiveJob.java | 92 +++
.../active/ActiveJobNotificationHandler.java | 194 ++++++
.../apache/asterix/active/ActiveManager.java | 70 ++
.../asterix/active/ActivePartitionMessage.java | 55 ++
.../apache/asterix/active/ActiveRuntime.java | 39 ++
.../apache/asterix/active/ActiveRuntimeId.java | 77 +++
.../asterix/active/ActiveRuntimeManager.java | 84 +++
.../asterix/active/ActiveRuntimeRegistry.java | 79 +++
.../org/apache/asterix/active/Activity.java | 80 +++
.../apache/asterix/active/ActivityState.java | 28 +
.../asterix/active/ConcurrentFramePool.java | 292 ++++++++
.../org/apache/asterix/active/EntityId.java | 76 +++
.../org/apache/asterix/active/FrameAction.java | 70 ++
.../active/IActiveEntityEventsListener.java | 34 +
.../apache/asterix/active/IActiveMessage.java | 44 ++
.../apache/asterix/active/IActiveRuntime.java | 28 +
.../asterix/active/IActiveRuntimeRegistry.java | 49 ++
.../optimizer/rules/UnnestToDataScanRule.java | 2 +-
.../translator/AbstractLangTranslator.java | 18 +-
.../asterix/translator/CompiledStatements.java | 66 +-
.../LangExpressionToPlanTranslator.java | 403 +++++------
.../apache/asterix/api/common/APIFramework.java | 20 +-
.../api/common/AsterixAppRuntimeContext.java | 37 +-
.../asterix/api/http/servlet/AQLAPIServlet.java | 15 +-
.../asterix/api/http/servlet/DDLAPIServlet.java | 17 +-
.../asterix/api/http/servlet/FeedServlet.java | 6 +-
.../api/http/servlet/QueryAPIServlet.java | 10 +-
.../api/http/servlet/RESTAPIServlet.java | 7 +-
.../api/http/servlet/UpdateAPIServlet.java | 10 +-
.../app/external/ActiveLifecycleListener.java | 82 +++
.../external/FeedJobNotificationHandler.java | 679 -------------------
.../apache/asterix/app/external/FeedJoint.java | 27 +-
.../app/external/FeedLifecycleListener.java | 228 -------
.../asterix/app/external/FeedOperations.java | 42 +-
.../app/external/FeedWorkCollection.java | 5 +-
.../asterix/aql/translator/QueryTranslator.java | 656 +++++++++---------
.../bootstrap/CCApplicationEntryPoint.java | 5 +-
.../asterix/messaging/CCMessageBroker.java | 13 +-
.../asterix/test/sqlpp/ParserTestExecutor.java | 7 +-
.../messaging/api/IApplicationMessage.java | 2 +-
asterixdb/asterix-external-data/pom.xml | 25 +-
.../external/feed/api/FeedOperationCounter.java | 12 +-
.../apache/asterix/external/feed/api/IFeed.java | 4 +-
.../feed/api/IFeedConnectionManager.java | 75 --
.../asterix/external/feed/api/IFeedJoint.java | 9 +-
.../feed/api/IFeedLifecycleListener.java | 49 --
.../asterix/external/feed/api/IFeedMessage.java | 41 --
.../asterix/external/feed/api/IFeedRuntime.java | 47 --
.../external/feed/api/ISubscribableRuntime.java | 3 +-
.../dataflow/DistributeFeedFrameWriter.java | 12 +-
.../feed/dataflow/FeedRuntimeInputHandler.java | 9 +-
.../external/feed/dataflow/FrameAction.java | 70 --
.../feed/management/ConcurrentFramePool.java | 293 --------
.../feed/management/FeedCollectInfo.java | 52 --
.../feed/management/FeedConnectionId.java | 13 +-
.../feed/management/FeedConnectionManager.java | 107 ---
.../feed/management/FeedConnectionRequest.java | 14 +-
.../feed/management/FeedEventsListener.java | 644 ++++++++++++++++++
.../external/feed/management/FeedId.java | 66 --
.../external/feed/management/FeedJointKey.java | 14 +-
.../external/feed/management/FeedManager.java | 97 ---
.../feed/management/FeedRuntimeManager.java | 83 ---
.../external/feed/message/EndFeedMessage.java | 22 +-
.../external/feed/message/FeedMessage.java | 4 +-
.../feed/message/FeedPartitionStartMessage.java | 48 --
.../feed/runtime/AdapterRuntimeManager.java | 8 +-
.../feed/runtime/CollectionRuntime.java | 17 +-
.../external/feed/runtime/FeedRuntime.java | 41 --
.../external/feed/runtime/FeedRuntimeId.java | 88 ---
.../external/feed/runtime/IngestionRuntime.java | 5 +-
.../feed/runtime/SubscribableRuntime.java | 16 +-
.../external/feed/watch/FeedConnectJobInfo.java | 11 +-
.../external/feed/watch/FeedIntakeInfo.java | 17 +-
.../external/feed/watch/FeedJobInfo.java | 86 ---
.../FeedCollectOperatorDescriptor.java | 26 +-
.../FeedCollectOperatorNodePushable.java | 31 +-
.../operators/FeedIntakeOperatorDescriptor.java | 13 +-
.../FeedIntakeOperatorNodePushable.java | 31 +-
.../FeedMessageOperatorDescriptor.java | 13 +-
.../FeedMessageOperatorNodePushable.java | 58 +-
.../operators/FeedMetaComputeNodePushable.java | 27 +-
.../operators/FeedMetaOperatorDescriptor.java | 2 +-
.../operators/FeedMetaStoreNodePushable.java | 30 +-
.../apache/asterix/external/util/FeedUtils.java | 23 +
.../feed/test/ConcurrentFramePoolUnitTest.java | 4 +-
.../external/feed/test/InputHandlerTest.java | 13 +-
.../aql/statement/SubscribeFeedStatement.java | 25 +-
.../asterix/lang/aql/util/RangeMapBuilder.java | 6 +-
.../asterix/lang/common/base/Statement.java | 85 +--
.../lang/common/statement/CompactStatement.java | 6 +-
.../common/statement/ConnectFeedStatement.java | 6 +-
.../statement/CreateDataverseStatement.java | 11 +-
.../statement/CreateFeedPolicyStatement.java | 6 +-
.../common/statement/CreateFeedStatement.java | 5 +-
.../statement/CreateFunctionStatement.java | 6 +-
.../common/statement/CreateIndexStatement.java | 6 +-
.../statement/CreatePrimaryFeedStatement.java | 6 +-
.../statement/CreateSecondaryFeedStatement.java | 6 +-
.../lang/common/statement/DatasetDecl.java | 6 +-
.../lang/common/statement/DataverseDecl.java | 6 +-
.../statement/DataverseDropStatement.java | 6 +-
.../lang/common/statement/DeleteStatement.java | 6 +-
.../statement/DisconnectFeedStatement.java | 6 +-
.../lang/common/statement/DropStatement.java | 6 +-
.../common/statement/FeedDropStatement.java | 6 +-
.../statement/FeedPolicyDropStatement.java | 6 +-
.../lang/common/statement/FunctionDecl.java | 6 +-
.../common/statement/FunctionDropStatement.java | 6 +-
.../lang/common/statement/IndexDecl.java | 6 +-
.../common/statement/IndexDropStatement.java | 6 +-
.../lang/common/statement/InsertStatement.java | 6 +-
.../lang/common/statement/LoadStatement.java | 6 +-
.../statement/NodeGroupDropStatement.java | 6 +-
.../lang/common/statement/NodegroupDecl.java | 6 +-
.../asterix/lang/common/statement/Query.java | 6 +-
.../RefreshExternalDatasetStatement.java | 6 +-
.../lang/common/statement/RunStatement.java | 6 +-
.../lang/common/statement/SetStatement.java | 6 +-
.../asterix/lang/common/statement/TypeDecl.java | 6 +-
.../common/statement/TypeDropStatement.java | 6 +-
.../lang/common/statement/UpdateStatement.java | 6 +-
.../lang/common/statement/UpsertStatement.java | 7 +-
.../lang/common/statement/WriteStatement.java | 6 +-
.../metadata/declared/FeedDataSource.java | 13 +-
.../apache/asterix/metadata/entities/Feed.java | 11 +-
.../metadata/feeds/FeedMetadataUtil.java | 48 +-
.../tools/translator/ADGenDmlTranslator.java | 12 +-
asterixdb/pom.xml | 1 +
.../ScalarFunctionCallExpression.java | 2 +-
131 files changed, 3411 insertions(+), 3235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
new file mode 100644
index 0000000..140d67a
--- /dev/null
+++ b/asterixdb/asterix-active/pom.xml
@@ -0,0 +1,45 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>apache-asterixdb</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-active</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.9-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
new file mode 100644
index 0000000..e5ccd05
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.job.JobId;
+
+public class ActiveEvent {
+
+ private final JobId jobId;
+ private final EntityId entityId;
+ private final Serializable payload;
+ private final EventKind eventKind;
+
+ public enum EventKind {
+ JOB_START,
+ JOB_FINISH,
+ PARTITION_EVENT
+ }
+
+ public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind) {
+ this(jobId, eventKind, null, null);
+ }
+
+ public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId) {
+ this(jobId, eventKind, feedId, null);
+ }
+
+ public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId, Serializable payload) {
+ this.jobId = jobId;
+ this.eventKind = eventKind;
+ this.entityId = feedId;
+ this.payload = payload;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public EntityId getFeedId() {
+ return entityId;
+ }
+
+ public Serializable getPayload() {
+ return payload;
+ }
+
+ public EventKind getEventKind() {
+ return eventKind;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
new file mode 100644
index 0000000..1e3eca1
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ActiveJob implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(ActiveJob.class.getName());
+ protected final EntityId entityId;
+ protected JobId jobId;
+ protected final Serializable jobObject;
+ protected ActivityState state;
+ protected JobSpecification spec;
+
+ public ActiveJob(EntityId entityId, JobId jobId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
+ this.entityId = entityId;
+ this.state = state;
+ this.jobId = jobId;
+ this.jobObject = jobInfo;
+ this.spec = spec;
+ }
+
+ public ActiveJob(EntityId entityId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
+ this.entityId = entityId;
+ this.state = state;
+ this.jobObject = jobInfo;
+ this.spec = spec;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ public ActivityState getState() {
+ return state;
+ }
+
+ public void setState(ActivityState state) {
+ this.state = state;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(this + " is in " + state + " state.");
+ }
+ }
+
+ public Object getJobObject() {
+ return jobObject;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public void setSpec(JobSpecification spec) {
+ this.spec = spec;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " [" + jobObject + "]";
+ }
+
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
new file mode 100644
index 0000000..9c69aca
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveEvent.EventKind;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ActiveJobNotificationHandler implements Runnable {
+ public static final ActiveJobNotificationHandler INSTANCE = new ActiveJobNotificationHandler();
+ public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
+ private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
+ private static final boolean DEBUG = false;
+ private final LinkedBlockingQueue<ActiveEvent> eventInbox;
+ private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
+ private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
+
+ private ActiveJobNotificationHandler() {
+ this.eventInbox = new LinkedBlockingQueue<>();
+ this.jobId2ActiveJobInfos = new HashMap<>();
+ this.entityEventListener = new HashMap<>();
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
+ LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName());
+ while (!Thread.interrupted()) {
+ try {
+ ActiveEvent event = getEventInbox().take();
+ ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
+ EntityId entityId = jobInfo.getEntityId();
+ IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Next event is of type " + event.getEventKind());
+ LOGGER.log(Level.INFO, "Notifying the listener");
+ }
+ listener.notify(event);
+ if (event.getEventKind() == EventKind.JOB_FINISH) {
+ removeFinishedJob(event.getJobId());
+ removeInactiveListener(listener);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error handling an active job event", e);
+ }
+ }
+ LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
+ }
+
+ private void removeFinishedJob(JobId jobId) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Removing the job");
+ }
+ jobId2ActiveJobInfos.remove(jobId);
+ }
+
+ private void removeInactiveListener(IActiveEntityEventsListener listener) {
+ if (!listener.isEntityActive()) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Removing the listener since it is not active anymore");
+ }
+ entityEventListener.remove(listener.getEntityId());
+ }
+ }
+
+ public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+ IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ LOGGER.log(Level.INFO, "Listener found: " + listener);
+ }
+ return entityEventListener.get(entityId);
+ }
+
+ public synchronized ActiveJob[] getActiveJobs() {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "getActiveJobs() was called");
+ LOGGER.log(Level.INFO, "Number of jobs found: " + jobId2ActiveJobInfos.size());
+ }
+ return jobId2ActiveJobInfos.values().toArray(new ActiveJob[jobId2ActiveJobInfos.size()]);
+ }
+
+ public boolean isActiveJob(JobId jobId) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "isActiveJob(JobId jobId) called with jobId: " + jobId);
+ boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+ LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ }
+ return jobId2ActiveJobInfos.get(jobId) != null;
+ }
+
+ public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO,
+ "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
+ + jobId);
+ }
+ Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
+ if (property == null || !(property instanceof ActiveJob)) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Job was is not active. property found to be: " + property);
+ }
+ return;
+ } else {
+ monitorJob(jobId, (ActiveJob) property);
+ }
+ if (DEBUG) {
+ boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+ LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ }
+ ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
+ if (jobInfo != null) {
+ EntityId entityId = jobInfo.getEntityId();
+ IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+ listener.notifyJobCreation(jobId, jobSpecification);
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Listener was notified" + jobId);
+ }
+ } else {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "Listener was not notified since it was not registered for the job " + jobId);
+ }
+ }
+ }
+
+ public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
+ return eventInbox;
+ }
+
+ public synchronized IActiveEntityEventsListener[] getEventListeners() {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "getEventListeners() was called");
+ LOGGER.log(Level.INFO, "returning " + entityEventListener.size() + " Listeners");
+ }
+ return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
+ }
+
+ public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+ + listener.getEntityId());
+ }
+ if (entityEventListener.containsKey(listener.getEntityId())) {
+ throw new HyracksDataException(
+ "Active Entity Listener " + listener.getEntityId() + " is already registered");
+ }
+ entityEventListener.put(listener.getEntityId(), listener);
+ }
+
+ public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+ boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+ LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ }
+ if (entityEventListener.containsKey(activeJob.getEntityId())) {
+ if (jobId2ActiveJobInfos.containsKey(jobId)) {
+ LOGGER.severe("Job is already being monitored for job: " + jobId);
+ return;
+ }
+ if (DEBUG) {
+ LOGGER.log(Level.INFO, "monitoring started for job id: " + jobId);
+ }
+ jobId2ActiveJobInfos.put(jobId, activeJob);
+ } else {
+ LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
new file mode 100644
index 0000000..5992294
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveManager {
+
+ private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
+
+ private final IActiveRuntimeRegistry activeRuntimeRegistry;
+
+ private final ConcurrentFramePool activeFramePool;
+
+ private final String nodeId;
+
+ public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
+ this.nodeId = nodeId;
+ this.activeRuntimeRegistry = new ActiveRuntimeRegistry(nodeId);
+ this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
+ this.runtimes = new ConcurrentHashMap<>();
+ }
+
+ public IActiveRuntimeRegistry getActiveRuntimeRegistry() {
+ return activeRuntimeRegistry;
+ }
+
+ public ConcurrentFramePool getFramePool() {
+ return activeFramePool;
+ }
+
+ public void registerRuntime(IActiveRuntime runtime) {
+ ActiveRuntimeId id = runtime.getRuntimeId();
+ if (!runtimes.containsKey(id)) {
+ runtimes.put(id, runtime);
+ }
+ }
+
+ public void deregisterRuntime(ActiveRuntimeId id) {
+ runtimes.remove(id);
+ }
+
+ public IActiveRuntime getSubscribableRuntime(ActiveRuntimeId subscribableRuntimeId) {
+ return runtimes.get(subscribableRuntimeId);
+ }
+
+ @Override
+ public String toString() {
+ return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
new file mode 100644
index 0000000..8875647
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.job.JobId;
+
+public class ActivePartitionMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final EntityId feedId;
+ private final JobId jobId;
+ private final Serializable payload;
+
+ public ActivePartitionMessage(EntityId feedId, JobId jobId, Serializable payload) {
+ this.feedId = feedId;
+ this.jobId = jobId;
+ this.payload = payload;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.ACTIVE_ENTITY_MESSAGE;
+ }
+
+ public EntityId getFeedId() {
+ return feedId;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public Serializable getPayload() {
+ return payload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
new file mode 100644
index 0000000..8b0914a
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class ActiveRuntime implements IActiveRuntime {
+
+ /** A unique identifier for the runtime **/
+ protected final ActiveRuntimeId runtimeId;
+
+ public ActiveRuntime(ActiveRuntimeId runtimeId) {
+ this.runtimeId = runtimeId;;
+ }
+
+ @Override
+ public ActiveRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ @Override
+ public String toString() {
+ return runtimeId.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
new file mode 100644
index 0000000..64926fd
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+public class ActiveRuntimeId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final EntityId entityId;
+ private final String runtimeId;
+ private final int partition;
+ private final int hashCode;
+
+ public ActiveRuntimeId(EntityId entityId, String runtimeId, int partition) {
+ this.entityId = entityId;
+ this.runtimeId = runtimeId;
+ this.partition = partition;
+ this.hashCode = toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "(" + entityId + ")" + "[" + partition + "]:" + runtimeId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ActiveRuntimeId)) {
+ return false;
+ }
+ ActiveRuntimeId other = (ActiveRuntimeId) o;
+ return other.entityId.equals(entityId) && other.getFeedRuntimeType().equals(runtimeId)
+ && other.getPartition() == partition;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ public String getFeedRuntimeType() {
+ return runtimeId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public String getRuntimeType() {
+ return runtimeId;
+ }
+
+ public EntityId getFeedId() {
+ return entityId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
new file mode 100644
index 0000000..9743856
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ActiveRuntimeManager {
+
+ private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
+ private final Map<ActiveRuntimeId, ActiveRuntime> activeRuntimes;
+
+ private final ExecutorService executorService;
+
+ public ActiveRuntimeManager() {
+ this.activeRuntimes = new ConcurrentHashMap<>();
+ this.executorService = Executors.newCachedThreadPool();
+ }
+
+ public void close() throws IOException {
+ if (executorService != null) {
+ executorService.shutdown();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
+ }
+ try {
+ executorService.awaitTermination(10L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
+ + " was interrupted while waiting for runtime managers to shutdown", e);
+ }
+ if (!executorService.isTerminated()) {
+ LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
+ + " failed to shutdown successfully. Will be forced to shutdown");
+ executorService.shutdownNow();
+ }
+ }
+ }
+
+ public ActiveRuntime getFeedRuntime(ActiveRuntimeId runtimeId) {
+ return activeRuntimes.get(runtimeId);
+ }
+
+ public void registerRuntime(ActiveRuntimeId runtimeId, ActiveRuntime feedRuntime) {
+ activeRuntimes.put(runtimeId, feedRuntime);
+ }
+
+ public synchronized void deregisterRuntime(ActiveRuntimeId runtimeId) {
+ activeRuntimes.remove(runtimeId);
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public Set<ActiveRuntimeId> getFeedRuntimes() {
+ return activeRuntimes.keySet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
new file mode 100644
index 0000000..050426c
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An implementation of the {@code IActiveRuntimeRegistry} interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with an active entity.
+ */
+public class ActiveRuntimeRegistry implements IActiveRuntimeRegistry {
+
+ private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeRegistry.class.getName());
+
+ private Map<ActiveRuntimeId, ActiveRuntimeManager> activeRuntimeManagers = new HashMap<>();
+ private final String nodeId;
+
+ public ActiveRuntimeRegistry(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void deregisterRuntime(ActiveRuntimeId runtimeId) {
+ try {
+ ActiveRuntimeManager mgr = activeRuntimeManagers.get(runtimeId);
+ if (mgr != null) {
+ mgr.deregisterRuntime(runtimeId);
+ mgr.close();
+ activeRuntimeManagers.remove(runtimeId);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Exception in closing feed runtime" + e.getMessage(), e);
+ }
+ }
+
+ }
+
+ @Override
+ public synchronized void registerRuntime(ActiveRuntime runtime) {
+ ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtime.getRuntimeId());
+ if (runtimeMgr == null) {
+ runtimeMgr = new ActiveRuntimeManager();
+ activeRuntimeManagers.put(runtime.getRuntimeId(), runtimeMgr);
+ }
+ runtimeMgr.registerRuntime(runtime.getRuntimeId(), runtime);
+ }
+
+ @Override
+ public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
+ ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtimeId);
+ return runtimeMgr != null ? runtimeMgr.getFeedRuntime(runtimeId) : null;
+ }
+
+ @Override
+ public String toString() {
+ return ActiveRuntimeRegistry.class.getSimpleName() + "[" + nodeId + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
new file mode 100644
index 0000000..9538118
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.active;
+
+import java.util.Map;
+
+public class Activity implements Comparable<Activity> {
+
+ protected int activityId;
+ protected final EntityId activeEntityId;
+ protected final Map<String, String> activityDetails;
+
+ public Activity(EntityId activeEntityId, Map<String, String> activityDetails) {
+ this.activeEntityId = activeEntityId;
+ this.activityDetails = activityDetails;
+ }
+
+ public String getDataverseName() {
+ return activeEntityId.getDataverse();
+ }
+
+ public String getActiveEntityName() {
+ return activeEntityId.getEntityName();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Activity)) {
+ return false;
+ }
+ return ((Activity) other).activeEntityId.equals(activeEntityId)
+ && ((Activity) other).getActivityId() != (activityId);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return activeEntityId + "." + activityId;
+ }
+
+ public int getActivityId() {
+ return activityId;
+ }
+
+ public void setActivityId(int activityId) {
+ this.activityId = activityId;
+ }
+
+ public Map<String, String> getActivityDetails() {
+ return activityDetails;
+ }
+
+ @Override
+ public int compareTo(Activity o) {
+ return o.getActivityId() - this.activityId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
new file mode 100644
index 0000000..1301535
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+// TODO: Document the state machine and its transition.
+public enum ActivityState {
+ CREATED,
+ INACTIVE,
+ ACTIVE,
+ UNDER_RECOVERY,
+ ENDED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
new file mode 100644
index 0000000..afe3b06
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+public class ConcurrentFramePool {
+ private static final boolean DEBUG = false;
+ private static final String ERROR_INVALID_FRAME_SIZE =
+ "The size should be an integral multiple of the default frame size";
+ private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+ "The requested frame size must not be greater than the allocated budget";
+ private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
+ private final String nodeId;
+ private final int budget;
+ private final int defaultFrameSize;
+ private final ArrayDeque<ByteBuffer> pool;
+ private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
+ private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
+ private int handedOut;
+ private int created;
+
+ public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
+ this.nodeId = nodeId;
+ this.defaultFrameSize = frameSize;
+ this.budget = (int) (budgetInBytes / frameSize);
+ this.pool = new ArrayDeque<>(budget);
+ this.largeFramesPools = new HashMap<>();
+ }
+
+ public int getMaxFrameSize() {
+ return budget * defaultFrameSize;
+ }
+
+ public synchronized ByteBuffer get() {
+ // Subscribers have higher priority
+ if (subscribers.isEmpty()) {
+ return doGet();
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
+ return null;
+ }
+
+ private ByteBuffer doGet() {
+ if (handedOut < budget) {
+ handedOut++;
+ return allocate();
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = 1");
+ }
+ return null;
+ }
+
+ public int remaining() {
+ return budget - handedOut;
+ }
+
+ private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+ // Subscribers have higher priority
+ if (bufferSize % defaultFrameSize != 0) {
+ throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
+ }
+ int multiplier = bufferSize / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
+ if (handedOut + multiplier <= budget) {
+ handedOut += multiplier;
+ ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
+ if (largeFramesPool == null || largeFramesPool.isEmpty()) {
+ if (created + multiplier > budget) {
+ freeup(multiplier);
+ }
+ created += multiplier;
+ return ByteBuffer.allocate(bufferSize);
+ }
+ ByteBuffer buffer = largeFramesPool.poll();
+ buffer.clear();
+ return buffer;
+ }
+ // Not enough budget
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = " + multiplier);
+ }
+ return null;
+ }
+
+ public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ if (subscribers.isEmpty()) {
+ return doGet(bufferSize);
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
+ return null;
+ }
+
+ private int freeup(int desiredNumberOfFreePages) {
+ int needToFree = desiredNumberOfFreePages - (budget - created);
+ int freed = 0;
+ // start by large frames
+ for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
+ .hasNext();) {
+ Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
+ if (entry.getKey() != desiredNumberOfFreePages) {
+ while (!entry.getValue().isEmpty()) {
+ entry.getValue().pop();
+ freed += entry.getKey();
+ if (freed >= needToFree) {
+ // created is handled here
+ created -= freed;
+ return freed;
+ }
+ }
+ it.remove();
+ }
+ }
+ // freed all large pages. need to free small pages as well
+ needToFree -= freed;
+ while (needToFree > 0) {
+ pool.pop();
+ needToFree--;
+ freed++;
+ }
+ created -= freed;
+ return freed;
+ }
+
+ private ByteBuffer allocate() {
+ if (pool.isEmpty()) {
+ if (created == budget) {
+ freeup(1);
+ }
+ created++;
+ return ByteBuffer.allocate(defaultFrameSize);
+ } else {
+ ByteBuffer buffer = pool.pop();
+ buffer.clear();
+ return buffer;
+ }
+ }
+
+ public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
+ if (handedOut + count <= budget) {
+ handedOut += count;
+ for (int i = 0; i < count; i++) {
+ buffers.add(allocate());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "ConcurrentFramePool [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
+ }
+
+ public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
+ for (ByteBuffer buffer : buffers) {
+ release(buffer);
+ }
+ }
+
+ public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
+ int multiples = buffer.capacity() / defaultFrameSize;
+ handedOut -= multiples;
+ if (DEBUG) {
+ LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+ }
+ if (multiples == 1) {
+ pool.add(buffer);
+ } else {
+ ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
+ if (largeFramesPool == null) {
+ largeFramesPool = new ArrayDeque<>();
+ largeFramesPools.put(multiples, largeFramesPool);
+ }
+ largeFramesPool.push(buffer);
+ }
+ // check subscribers
+ while (!subscribers.isEmpty()) {
+ FrameAction frameAction = subscribers.peek();
+ ByteBuffer freeBuffer;
+ // check if we have enough and answer immediately.
+ if (frameAction.getSize() == defaultFrameSize) {
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+ }
+ freeBuffer = doGet();
+ } else {
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested "
+ + frameAction.getSize() / defaultFrameSize + " frames");
+ }
+ freeBuffer = doGet(frameAction.getSize());
+ }
+ if (freeBuffer != null) {
+ int handedOutBeforeCall = handedOut;
+ try {
+ frameAction.call(freeBuffer);
+ } catch (Exception e) {
+ LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+ // TODO(amoudi): Add test cases and get rid of recursion
+ if (handedOut == handedOutBeforeCall) {
+ release(freeBuffer);
+ }
+ throw e;
+ } finally {
+ subscribers.remove();
+ if (DEBUG) {
+ LOGGER.info(
+ "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+ }
+ }
+ } else {
+ if (DEBUG) {
+ LOGGER.info("Failed to allocate requested frames");
+ }
+ break;
+ }
+ }
+ if (DEBUG) {
+ LOGGER.info(subscribers.size() + " remaining subscribers");
+ }
+ }
+
+ public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
+ // check if subscribers are empty?
+ if (subscribers.isEmpty()) {
+ ByteBuffer buffer;
+ // check if we have enough and answer immediately.
+ if (frameAction.getSize() == defaultFrameSize) {
+ buffer = doGet();
+ } else {
+ buffer = doGet(frameAction.getSize());
+ }
+ if (buffer != null) {
+ frameAction.call(buffer);
+ // There is no need to subscribe. perform action and return false
+ return false;
+ }
+ } else {
+ int multiplier = frameAction.getSize() / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
+ }
+ // none of the above, add to subscribers and return true
+ subscribers.add(frameAction);
+ return true;
+ }
+
+ /*
+ * For unit testing purposes
+ */
+ public Collection<FrameAction> getSubscribers() {
+ return subscribers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
new file mode 100644
index 0000000..cdf702d
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a data feed.
+ */
+public class EntityId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String extensionName;
+ private final String dataverse;
+ private final String entityName;
+ private final int hashCode;
+
+ public EntityId(String extentionName, String dataverse, String entityName) {
+ this.extensionName = extentionName;
+ this.dataverse = dataverse;
+ this.entityName = entityName;
+ this.hashCode = toString().hashCode();
+ }
+
+ public String getDataverse() {
+ return dataverse;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof EntityId)) {
+ return false;
+ }
+ if (this == o || ((EntityId) o).getExtensionName().equals(extensionName)
+ && ((EntityId) o).getEntityName().equals(entityName)
+ && ((EntityId) o).getDataverse().equals(dataverse)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return dataverse + "." + entityName + "(" + extensionName + ")";
+ }
+
+ public String getExtensionName() {
+ return extensionName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
new file mode 100644
index 0000000..849d360
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public class FrameAction {
+ private static final boolean DEBUG = false;
+ private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+ private ByteBuffer allocated;
+ private ByteBuffer frame;
+
+ public void call(ByteBuffer freeFrame) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: My subscription is being answered");
+ }
+ freeFrame.put(frame);
+ synchronized (this) {
+ allocated = freeFrame;
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Waking up waiting threads");
+ }
+ notifyAll();
+ }
+ }
+
+ public synchronized ByteBuffer retrieve() throws InterruptedException {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Attempting to get allocated buffer");
+ }
+ while (allocated == null) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+ }
+ wait();
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Awoken Up");
+ }
+ }
+ ByteBuffer temp = allocated;
+ allocated = null;
+ return temp;
+ }
+
+ public void setFrame(ByteBuffer frame) {
+ this.frame = frame;
+ }
+
+ public int getSize() {
+ return frame.capacity();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
new file mode 100644
index 0000000..156576c
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public interface IActiveEntityEventsListener {
+
+ public void notify(ActiveEvent message);
+
+ public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification);
+
+ public boolean isEntityActive();
+
+ public EntityId getEntityId();
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
new file mode 100644
index 0000000..e4c7171
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * @deprecated
+ * This interface is expected to go away. instead, one should use the IMessageBroker interfaces to exchange
+ * messages
+ */
+@Deprecated
+public interface IActiveMessage extends Serializable, JSONSerializable {
+
+ public enum MessageType {
+ END
+ }
+
+ /**
+ * Gets the type associated with this message
+ *
+ * @return MessageType type associated with this message
+ */
+ public MessageType getMessageType();
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
new file mode 100644
index 0000000..32c5c50
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IActiveRuntime {
+
+ /**
+ * @return the unique runtime id associated with the feedRuntime
+ */
+ public ActiveRuntimeId getRuntimeId();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
new file mode 100644
index 0000000..b2c6f8e
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.IOException;
+
+public interface IActiveRuntimeRegistry {
+
+ /**
+ * Allows registration of a runtime.
+ *
+ * @param runtime
+ * @throws Exception
+ */
+ public void registerRuntime(ActiveRuntime runtime);
+
+ /**
+ * Obtain runtime corresponding to a feedRuntimeId
+ *
+ * @param runtimeId
+ * @return
+ */
+ public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId);
+
+ /**
+ * De-register a feed
+ *
+ * @param feedConnection
+ * @throws IOException
+ */
+ void deregisterRuntime(ActiveRuntimeId runtimeId);
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 327154f..d623830 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.metadata.declared.AqlDataSource;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.declared.AqlSourceId;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index a161717..83e4375 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -47,7 +47,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
*/
public abstract class AbstractLangTranslator {
- protected static final Logger LOGGER = Logger.getLogger(AbstractLangTranslator.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(AbstractLangTranslator.class.getName());
protected static final Map<String, BuiltinType> builtinTypeMap = AsterixBuiltinTypeMap.getBuiltinTypes();
@@ -107,7 +107,7 @@ public abstract class AbstractLangTranslator {
String message = null;
String dataverse = defaultDataverse != null ? defaultDataverse.getDataverseName() : null;
switch (stmt.getKind()) {
- case INSERT:
+ case Statement.INSERT:
InsertStatement insertStmt = (InsertStatement) stmt;
if (insertStmt.getDataverseName() != null) {
dataverse = insertStmt.getDataverseName().getValue();
@@ -119,7 +119,7 @@ public abstract class AbstractLangTranslator {
}
break;
- case DELETE:
+ case Statement.DELETE:
DeleteStatement deleteStmt = (DeleteStatement) stmt;
if (deleteStmt.getDataverseName() != null) {
dataverse = deleteStmt.getDataverseName().getValue();
@@ -131,7 +131,7 @@ public abstract class AbstractLangTranslator {
}
break;
- case NODEGROUP_DROP:
+ case Statement.NODEGROUP_DROP:
String nodegroupName = ((NodeGroupDropStatement) stmt).getNodeGroupName().getValue();
invalidOperation = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME.equals(nodegroupName);
if (invalidOperation) {
@@ -139,16 +139,16 @@ public abstract class AbstractLangTranslator {
}
break;
- case DATAVERSE_DROP:
+ case Statement.DATAVERSE_DROP:
DataverseDropStatement dvDropStmt = (DataverseDropStatement) stmt;
- invalidOperation = MetadataConstants.METADATA_DATAVERSE_NAME
- .equals(dvDropStmt.getDataverseName().getValue());
+ invalidOperation =
+ MetadataConstants.METADATA_DATAVERSE_NAME.equals(dvDropStmt.getDataverseName().getValue());
if (invalidOperation) {
message = "Cannot drop dataverse:" + dvDropStmt.getDataverseName().getValue();
}
break;
- case DATASET_DROP:
+ case Statement.DATASET_DROP:
DropStatement dropStmt = (DropStatement) stmt;
if (dropStmt.getDataverseName() != null) {
dataverse = dropStmt.getDataverseName().getValue();
@@ -159,7 +159,7 @@ public abstract class AbstractLangTranslator {
+ MetadataConstants.METADATA_DATAVERSE_NAME;
}
break;
- case DATASET_DECL:
+ case Statement.DATASET_DECL:
DatasetDecl datasetStmt = (DatasetDecl) stmt;
Map<String, String> hints = datasetStmt.getHints();
if (hints != null && !hints.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 9d092a7..7a3b2a4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.om.types.IAType;
@@ -38,7 +38,7 @@ public class CompiledStatements {
public static interface ICompiledStatement {
- public Kind getKind();
+ public byte getKind();
}
public static class CompiledDatasetDropStatement implements ICompiledStatement {
@@ -59,8 +59,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.DATASET_DROP;
+ public byte getKind() {
+ return Statement.DATASET_DROP;
}
}
@@ -83,8 +83,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.CREATE_DATAVERSE;
+ public byte getKind() {
+ return Statement.CREATE_DATAVERSE;
}
}
@@ -100,8 +100,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.NODEGROUP_DROP;
+ public byte getKind() {
+ return Statement.NODEGROUP_DROP;
}
}
@@ -129,8 +129,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.INDEX_DROP;
+ public byte getKind() {
+ return Statement.INDEX_DROP;
}
}
@@ -152,8 +152,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.DATAVERSE_DROP;
+ public byte getKind() {
+ return Statement.DATAVERSE_DROP;
}
}
@@ -169,8 +169,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.TYPE_DROP;
+ public byte getKind() {
+ return Statement.TYPE_DROP;
}
}
@@ -241,8 +241,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.CREATE_INDEX;
+ public byte getKind() {
+ return Statement.CREATE_INDEX;
}
}
@@ -285,8 +285,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.LOAD;
+ public byte getKind() {
+ return Statement.LOAD;
}
}
@@ -322,8 +322,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.INSERT;
+ public byte getKind() {
+ return Statement.INSERT;
}
}
@@ -334,8 +334,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.UPSERT;
+ public byte getKind() {
+ return Statement.UPSERT;
}
}
@@ -380,8 +380,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.CONNECT_FEED;
+ public byte getKind() {
+ return Statement.CONNECT_FEED;
}
public String getPolicyName() {
@@ -405,7 +405,7 @@ public class CompiledStatements {
}
public String getFeedName() {
- return request.getReceivingFeedId().getFeedName();
+ return request.getReceivingFeedId().getEntityName();
}
@Override
@@ -418,8 +418,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.SUBSCRIBE_FEED;
+ public byte getKind() {
+ return Statement.SUBSCRIBE_FEED;
}
}
@@ -449,8 +449,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.DISCONNECT_FEED;
+ public byte getKind() {
+ return Statement.DISCONNECT_FEED;
}
}
@@ -494,8 +494,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.DELETE;
+ public byte getKind() {
+ return Statement.DELETE;
}
}
@@ -518,8 +518,8 @@ public class CompiledStatements {
}
@Override
- public Kind getKind() {
- return Kind.COMPACT;
+ public byte getKind() {
+ return Statement.COMPACT;
}
}