You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:27 UTC
[11/11] incubator-apex-core git commit: APEX-39 Added store and
embeddable interfaces for app data which allow query operators to be embedded
in a store. Also added detection of a store with an embeddable query
operator.
APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d748ed46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d748ed46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d748ed46
Branch: refs/heads/devel-3
Commit: d748ed46fb2ef91d74a70c2be52dc4a56bb4df71
Parents: aceaeeb
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 7 17:47:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:55 2015 -0700
----------------------------------------------------------------------
.../common/experimental/AppData.java | 53 +++++++++++++++++++-
.../stram/StreamingContainerManager.java | 31 ++++++++++--
2 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index fbdc82c..22259c5 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -15,13 +15,15 @@
*/
package com.datatorrent.common.experimental;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-
import org.apache.hadoop.classification.InterfaceStability;
/**
@@ -33,6 +35,55 @@ import org.apache.hadoop.classification.InterfaceStability;
public interface AppData
{
/**
+ * This interface is for App Data stores which support embedding a query operator.
+ * @param <QUERY_TYPE> The type of the query tuple emitted by the embedded query operator.
+ */
+ interface Store<QUERY_TYPE> extends Operator.ActivationListener<OperatorContext>
+ {
+ /**
+ * Gets the query connector which is used by the store operator to receive queries. If this method returns
+ * null then this Store should have a separate query operator connected to it.
+ * @return The query connector which is used by the store operator to receive queries.
+ */
+ public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
+
+ /**
+ * Sets the query connector which is used by the store operator to receive queries. The store operator will call
+ * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method of the embeddable query operator before
+ * its {@link Operator#setup} method is called.
+ * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries.
+ */
+ public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
+ }
+
+ /**
+ * This interface represents a query operator which can be embedded into an AppData data source. This operator could also
+ * be used as a standalone operator. The distinction between being used in a standalone or embedded context is made by
+ * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method. If this method is called at least once then the {@link EmbeddableQueryInfoProvider}
+ * will operate as if it were embedded in an {@link AppData.Store} operator. If this method is never called then the operator will behave as if
+ * it were a standalone operator.<br/><br/>
+ * <b>Note:</b> When an {@link EmbeddableQueryInfoProvider} is set on an {@link AppData.Store} then it's {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+ * method is called before {@link Operator#setup}.
+ * @param <QUERY_TYPE> The type of the query emitted by the operator.
+ */
+ interface EmbeddableQueryInfoProvider<QUERY_TYPE> extends Operator, ConnectionInfoProvider, Operator.ActivationListener<OperatorContext>
+ {
+ /**
+ * Gets the output port for queries.
+ * @return The output port for queries.
+ */
+ public DefaultOutputPort<QUERY_TYPE> getOutputPort();
+
+ /**
+ * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}.
+ * If this method is never called then this operator will behave as a standalone operator. When an {@link EmbeddableQueryInfoProvider}
+ * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+ * method once before the {@link Operator.setup} is called.
+ */
+ public void enableEmbeddedMode();
+ }
+
+ /**
* This interface should be implemented by AppData Query and Result operators.
*/
interface ConnectionInfoProvider
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7002c1d..7944a4b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -132,6 +132,7 @@ public class StreamingContainerManager implements PlanContext
public final static String APP_META_FILENAME = "meta.json";
public final static String APP_META_KEY_ATTRIBUTES = "attributes";
public final static String APP_META_KEY_METRICS = "metrics";
+ public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes
public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
@@ -567,6 +568,22 @@ public class StreamingContainerManager implements PlanContext
String queryUrl = null;
String queryTopic = null;
+ boolean hasEmbeddedQuery = false;
+
+ //Discover embeddable query connectors
+ if (operatorMeta.getOperator() instanceof AppData.Store<?>) {
+ AppData.Store<?> store = (AppData.Store<?>)operatorMeta.getOperator();
+ AppData.EmbeddableQueryInfoProvider<?> embeddableQuery = store.getEmbeddableQueryInfoProvider();
+
+ if (embeddableQuery != null) {
+ hasEmbeddedQuery = true;
+ queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
+ queryUrl = embeddableQuery.getAppDataURL();
+ queryTopic = embeddableQuery.getTopic();
+ }
+ }
+
+ //Discover separate query operators
LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
LogicalPlan.InputPortMeta portMeta = entry.getKey();
@@ -574,10 +591,16 @@ public class StreamingContainerManager implements PlanContext
if (queryUrl == null) {
OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta();
if (queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
- AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider) queryOperatorMeta.getOperator();
- queryOperatorName = queryOperatorMeta.getName();
- queryUrl = queryOperator.getAppDataURL();
- queryTopic = queryOperator.getTopic();
+ if (!hasEmbeddedQuery) {
+ AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator();
+ queryOperatorName = queryOperatorMeta.getName();
+ queryUrl = queryOperator.getAppDataURL();
+ queryTopic = queryOperator.getTopic();
+ } else {
+ LOG.warn("An embeddable query connector and the {} query operator were discovered. " +
+ "The query operator will be ignored and the embeddable query connector will be used instead.",
+ operatorMeta.getName());
+ }
}
} else {
LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());