You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/13 00:49:17 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1895 #resolve #comment refactor AbstractAppDataSnapshotServer to support override or provide QueryExecutor

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 40b0c42e4 -> 958471a4d


MLHR-1895 #resolve #comment refactor AbstractAppDataSnapshotServer to support override or provide QueryExecutor


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d9957541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d9957541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d9957541

Branch: refs/heads/devel-3
Commit: d9957541a6f0521b4d16a011f02f35a863c9a10b
Parents: 02f48e1
Author: bright <br...@bright-mac.local>
Authored: Wed Nov 11 13:44:36 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Thu Nov 12 15:05:12 2015 -0800

----------------------------------------------------------------------
 .../snapshot/AbstractAppDataSnapshotServer.java | 132 +++++++++++++------
 1 file changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d9957541/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index a309746..ded099b 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -19,20 +19,26 @@
 package com.datatorrent.lib.appdata.snapshot;
 
 import java.io.IOException;
-
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.validation.constraints.NotNull;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.mutable.MutableLong;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
 import com.datatorrent.lib.appdata.StoreUtils;
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
@@ -40,19 +46,17 @@ import com.datatorrent.lib.appdata.query.QueryExecutor;
 import com.datatorrent.lib.appdata.query.QueryManagerSynchronous;
 import com.datatorrent.lib.appdata.query.serde.MessageDeserializerFactory;
 import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
-import com.datatorrent.lib.appdata.schemas.*;
+import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
+import com.datatorrent.lib.appdata.schemas.DataResultSnapshot;
 import com.datatorrent.lib.appdata.schemas.Message;
 import com.datatorrent.lib.appdata.schemas.Query;
 import com.datatorrent.lib.appdata.schemas.Result;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
-import com.datatorrent.common.experimental.AppData;
-import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
+import com.datatorrent.lib.appdata.schemas.ResultFormatter;
+import com.datatorrent.lib.appdata.schemas.SchemaQuery;
+import com.datatorrent.lib.appdata.schemas.SchemaRegistry;
+import com.datatorrent.lib.appdata.schemas.SchemaRegistrySingle;
+import com.datatorrent.lib.appdata.schemas.SchemaResult;
+import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
 
 /**
  * This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data
@@ -67,7 +71,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   /**
    * The {@link QueryManagerSynchronous} for the operator.
    */
-  private transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor;
+  protected transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor;
   /**
    * The {@link MessageDeserializerFactory} for the operator.
    */
@@ -91,13 +95,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   /**
    * The current data to be served by the operator.
    */
-  private List<GPOMutable> currentData = Lists.newArrayList();
+  protected List<GPOMutable> currentData = Lists.newArrayList();
   private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
   private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
 
   @AppData.ResultPort
   public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
 
+  /**
+   * The queryExecutor execute the query and return the result.
+   */
+  protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor;
+  
   @AppData.QueryPort
   @InputPortFieldAnnotation(optional=true)
   public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
@@ -105,44 +114,59 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     @Override
     public void process(String queryJSON)
     {
-      LOG.debug("query {}", queryJSON);
-      Message query = null;
-
-      try {
-        query = queryDeserializerFactory.deserialize(queryJSON);
-      } catch (IOException ex) {
-        LOG.error("Error parsing query: {}", queryJSON);
-        LOG.error("{}", ex);
-        return;
-      }
+      processQuery(queryJSON);
+    }
+  };
+  
+  /**
+   * process the query send.
+   * provide this method to give sub class a chance to override.
+   * @param queryJSON
+   */
+  protected void processQuery(String queryJSON)
+  {
+    LOG.debug("query {}", queryJSON);
+    Message query = null;
+
+    try {
+      query = queryDeserializerFactory.deserialize(queryJSON);
+    } catch (IOException ex) {
+      LOG.error("Error parsing query: {}", queryJSON);
+      LOG.error("{}", ex);
+      return;
+    }
 
-      if (query instanceof SchemaQuery) {
-        SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
+    if (query instanceof SchemaQuery) {
+      SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
 
-        if (schemaResult != null) {
-          LOG.debug("queueing {}", schemaResult);
-          schemaQueue.add(schemaResult);
-        }
-      } else if (query instanceof DataQuerySnapshot) {
-        queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
+      if (schemaResult != null) {
+        LOG.debug("queueing {}", schemaResult);
+        schemaQueue.add(schemaResult);
       }
+    } else if (query instanceof DataQuerySnapshot) {
+      queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
     }
-  };
+  }
 
   public transient final DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
   {
     @Override
     public void process(List<INPUT_EVENT> rows)
     {
-      currentData.clear();
-
-      for(INPUT_EVENT inputEvent: rows) {
-        GPOMutable gpoRow = convert(inputEvent);
-        currentData.add(gpoRow);
-      }
+      processData(rows);
     }
   };
 
+  protected void processData(List<INPUT_EVENT> rows)
+  {
+    currentData.clear();
+
+    for (INPUT_EVENT inputEvent : rows) {
+      GPOMutable gpoRow = convert(inputEvent);
+      currentData.add(gpoRow);
+    }
+  }
+  
   /**
    * Create operator.
    */
@@ -174,8 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     schema = new SnapshotSchema(snapshotSchemaJSON);
     schemaRegistry = new SchemaRegistrySingle(schema);
     //Setup for query processing
-    queryProcessor = QueryManagerSynchronous.newInstance(new SnapshotComputer(), new AppDataWindowEndQueueManager<Query, Void>());
-
+    setupQueryProcessor();
+    
     queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class,
                                                            DataQuerySnapshot.class);
     queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
@@ -190,6 +214,12 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
       embeddableQueryInfoProvider.setup(context);
     }
   }
+  
+  protected void setupQueryProcessor()
+  {
+    queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, 
+        new AppDataWindowEndQueueManager<Query, Void>());
+  }
 
   @Override
   public void beginWindow(long windowId)
@@ -312,4 +342,20 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractAppDataSnapshotServer.class);
+
+  public QueryExecutor<Query, Void, MutableLong, Result> getQueryExecutor()
+  {
+    return queryExecutor;
+  }
+
+  public void setQueryExecutor(QueryExecutor<Query, Void, MutableLong, Result> queryExecutor)
+  {
+    this.queryExecutor = queryExecutor;
+  }
+
+  public List<GPOMutable> getCurrentData()
+  {
+    return currentData;
+  }
+  
 }


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1895-PR' into devel-3

Posted by ti...@apache.org.
Merge branch 'MLHR-1895-PR' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/958471a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/958471a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/958471a4

Branch: refs/heads/devel-3
Commit: 958471a4dbd3b473e027602e28ede004d1c992c0
Parents: 40b0c42 d995754
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Nov 12 15:44:41 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Nov 12 15:44:41 2015 -0800

----------------------------------------------------------------------
 .../snapshot/AbstractAppDataSnapshotServer.java | 132 +++++++++++++------
 1 file changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------