You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:27 UTC

[11/11] incubator-apex-core git commit: APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.

APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d748ed46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d748ed46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d748ed46

Branch: refs/heads/devel-3
Commit: d748ed46fb2ef91d74a70c2be52dc4a56bb4df71
Parents: aceaeeb
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 7 17:47:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:55 2015 -0700

----------------------------------------------------------------------
 .../common/experimental/AppData.java            | 53 +++++++++++++++++++-
 .../stram/StreamingContainerManager.java        | 31 ++++++++++--
 2 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index fbdc82c..22259c5 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -15,13 +15,15 @@
  */
 package com.datatorrent.common.experimental;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
@@ -33,6 +35,55 @@ import org.apache.hadoop.classification.InterfaceStability;
 public interface AppData
 {
   /**
+   * This interface is for App Data stores which support embedding a query operator.
+   * @param <QUERY_TYPE> The type of the query tuple emitted by the embedded query operator.
+   */
+  interface Store<QUERY_TYPE> extends Operator.ActivationListener<OperatorContext>
+  {
+    /**
+     * Gets the query connector which is used by the store operator to receive queries. If this method returns
+     * null then this Store should have a separate query operator connected to it.
+     * @return The query connector which is used by the store operator to receive queries.
+     */
+    public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
+
+    /**
+     * Sets the query connector which is used by the store operator to receive queries. The store operator will call
+     * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method of the embeddable query operator before
+     * its {@link Operator#setup} method is called.
+     * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries.
+     */
+    public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
+  }
+
+  /**
+   * This interface represents a query operator which can be embedded into an AppData data source. This operator could also
+   * be used as a standalone operator. The distinction between being used in a standalone or embedded context is made by
+   * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method. If this method is called at least once then the {@link EmbeddableQueryInfoProvider}
+   * will operate as if it were embedded in an {@link AppData.Store} operator. If this method is never called then the operator will behave as if
+   * it were a standalone operator.<br/><br/>
+   * <b>Note:</b> When an {@link EmbeddableQueryInfoProvider} is set on an {@link AppData.Store} then it's {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+   * method is called before {@link Operator#setup}.
+   * @param <QUERY_TYPE> The type of the query emitted by the operator.
+   */
+  interface EmbeddableQueryInfoProvider<QUERY_TYPE> extends Operator, ConnectionInfoProvider, Operator.ActivationListener<OperatorContext>
+  {
+    /**
+     * Gets the output port for queries.
+     * @return The output port for queries.
+     */
+    public DefaultOutputPort<QUERY_TYPE> getOutputPort();
+
+    /**
+     * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}.
+     * If this method is never called then this operator will behave as a standalone operator. When an {@link EmbeddableQueryInfoProvider}
+     * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+     * method once before the {@link Operator.setup} is called.
+     */
+    public void enableEmbeddedMode();
+  }
+
+  /**
    * This interface should be implemented by AppData Query and Result operators.
    */
   interface ConnectionInfoProvider

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7002c1d..7944a4b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -132,6 +132,7 @@ public class StreamingContainerManager implements PlanContext
   public final static String APP_META_FILENAME = "meta.json";
   public final static String APP_META_KEY_ATTRIBUTES = "attributes";
   public final static String APP_META_KEY_METRICS = "metrics";
+  public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
 
   public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes
   public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
@@ -567,6 +568,22 @@ public class StreamingContainerManager implements PlanContext
           String queryUrl = null;
           String queryTopic = null;
 
+          boolean hasEmbeddedQuery = false;
+
+          //Discover embeddable query connectors
+          if (operatorMeta.getOperator() instanceof AppData.Store<?>) {
+            AppData.Store<?> store = (AppData.Store<?>)operatorMeta.getOperator();
+            AppData.EmbeddableQueryInfoProvider<?> embeddableQuery = store.getEmbeddableQueryInfoProvider();
+
+            if (embeddableQuery != null) {
+              hasEmbeddedQuery = true;
+              queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
+              queryUrl = embeddableQuery.getAppDataURL();
+              queryTopic = embeddableQuery.getTopic();
+            }
+          }
+
+          //Discover separate query operators
           LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
           for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
             LogicalPlan.InputPortMeta portMeta = entry.getKey();
@@ -574,10 +591,16 @@ public class StreamingContainerManager implements PlanContext
               if (queryUrl == null) {
                 OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta();
                 if (queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
-                  AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider) queryOperatorMeta.getOperator();
-                  queryOperatorName = queryOperatorMeta.getName();
-                  queryUrl = queryOperator.getAppDataURL();
-                  queryTopic = queryOperator.getTopic();
+                  if (!hasEmbeddedQuery) {
+                    AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator();
+                    queryOperatorName = queryOperatorMeta.getName();
+                    queryUrl = queryOperator.getAppDataURL();
+                    queryTopic = queryOperator.getTopic();
+                  } else {
+                    LOG.warn("An embeddable query connector and the {} query operator were discovered. " +
+                             "The query operator will be ignored and the embeddable query connector will be used instead.",
+                             operatorMeta.getName());
+                  }
                 }
               } else {
                 LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());