You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/13 00:49:17 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1895 #resolve #comment
refactor AbstractAppDataSnapshotServer to support override or provide
QueryExecutor
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 40b0c42e4 -> 958471a4d
MLHR-1895 #resolve #comment refactor AbstractAppDataSnapshotServer to support override or provide QueryExecutor
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d9957541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d9957541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d9957541
Branch: refs/heads/devel-3
Commit: d9957541a6f0521b4d16a011f02f35a863c9a10b
Parents: 02f48e1
Author: bright <br...@bright-mac.local>
Authored: Wed Nov 11 13:44:36 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Thu Nov 12 15:05:12 2015 -0800
----------------------------------------------------------------------
.../snapshot/AbstractAppDataSnapshotServer.java | 132 +++++++++++++------
1 file changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d9957541/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index a309746..ded099b 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -19,20 +19,26 @@
package com.datatorrent.lib.appdata.snapshot;
import java.io.IOException;
-
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.validation.constraints.NotNull;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.mutable.MutableLong;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
import com.datatorrent.lib.appdata.StoreUtils;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
@@ -40,19 +46,17 @@ import com.datatorrent.lib.appdata.query.QueryExecutor;
import com.datatorrent.lib.appdata.query.QueryManagerSynchronous;
import com.datatorrent.lib.appdata.query.serde.MessageDeserializerFactory;
import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
-import com.datatorrent.lib.appdata.schemas.*;
+import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
+import com.datatorrent.lib.appdata.schemas.DataResultSnapshot;
import com.datatorrent.lib.appdata.schemas.Message;
import com.datatorrent.lib.appdata.schemas.Query;
import com.datatorrent.lib.appdata.schemas.Result;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
-import com.datatorrent.common.experimental.AppData;
-import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
+import com.datatorrent.lib.appdata.schemas.ResultFormatter;
+import com.datatorrent.lib.appdata.schemas.SchemaQuery;
+import com.datatorrent.lib.appdata.schemas.SchemaRegistry;
+import com.datatorrent.lib.appdata.schemas.SchemaRegistrySingle;
+import com.datatorrent.lib.appdata.schemas.SchemaResult;
+import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
/**
* This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data
@@ -67,7 +71,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
/**
* The {@link QueryManagerSynchronous} for the operator.
*/
- private transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor;
+ protected transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor;
/**
* The {@link MessageDeserializerFactory} for the operator.
*/
@@ -91,13 +95,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
/**
* The current data to be served by the operator.
*/
- private List<GPOMutable> currentData = Lists.newArrayList();
+ protected List<GPOMutable> currentData = Lists.newArrayList();
private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
@AppData.ResultPort
public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
+ /**
+ * The queryExecutor execute the query and return the result.
+ */
+ protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor;
+
@AppData.QueryPort
@InputPortFieldAnnotation(optional=true)
public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
@@ -105,44 +114,59 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void process(String queryJSON)
{
- LOG.debug("query {}", queryJSON);
- Message query = null;
-
- try {
- query = queryDeserializerFactory.deserialize(queryJSON);
- } catch (IOException ex) {
- LOG.error("Error parsing query: {}", queryJSON);
- LOG.error("{}", ex);
- return;
- }
+ processQuery(queryJSON);
+ }
+ };
+
+ /**
+ * process the query send.
+ * provide this method to give sub class a chance to override.
+ * @param queryJSON
+ */
+ protected void processQuery(String queryJSON)
+ {
+ LOG.debug("query {}", queryJSON);
+ Message query = null;
+
+ try {
+ query = queryDeserializerFactory.deserialize(queryJSON);
+ } catch (IOException ex) {
+ LOG.error("Error parsing query: {}", queryJSON);
+ LOG.error("{}", ex);
+ return;
+ }
- if (query instanceof SchemaQuery) {
- SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
+ if (query instanceof SchemaQuery) {
+ SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
- if (schemaResult != null) {
- LOG.debug("queueing {}", schemaResult);
- schemaQueue.add(schemaResult);
- }
- } else if (query instanceof DataQuerySnapshot) {
- queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
+ if (schemaResult != null) {
+ LOG.debug("queueing {}", schemaResult);
+ schemaQueue.add(schemaResult);
}
+ } else if (query instanceof DataQuerySnapshot) {
+ queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
}
- };
+ }
public transient final DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>()
{
@Override
public void process(List<INPUT_EVENT> rows)
{
- currentData.clear();
-
- for(INPUT_EVENT inputEvent: rows) {
- GPOMutable gpoRow = convert(inputEvent);
- currentData.add(gpoRow);
- }
+ processData(rows);
}
};
+ protected void processData(List<INPUT_EVENT> rows)
+ {
+ currentData.clear();
+
+ for (INPUT_EVENT inputEvent : rows) {
+ GPOMutable gpoRow = convert(inputEvent);
+ currentData.add(gpoRow);
+ }
+ }
+
/**
* Create operator.
*/
@@ -174,8 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
schema = new SnapshotSchema(snapshotSchemaJSON);
schemaRegistry = new SchemaRegistrySingle(schema);
//Setup for query processing
- queryProcessor = QueryManagerSynchronous.newInstance(new SnapshotComputer(), new AppDataWindowEndQueueManager<Query, Void>());
-
+ setupQueryProcessor();
+
queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class,
DataQuerySnapshot.class);
queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
@@ -190,6 +214,12 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
embeddableQueryInfoProvider.setup(context);
}
}
+
+ protected void setupQueryProcessor()
+ {
+ queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor,
+ new AppDataWindowEndQueueManager<Query, Void>());
+ }
@Override
public void beginWindow(long windowId)
@@ -312,4 +342,20 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractAppDataSnapshotServer.class);
+
+ public QueryExecutor<Query, Void, MutableLong, Result> getQueryExecutor()
+ {
+ return queryExecutor;
+ }
+
+ public void setQueryExecutor(QueryExecutor<Query, Void, MutableLong, Result> queryExecutor)
+ {
+ this.queryExecutor = queryExecutor;
+ }
+
+ public List<GPOMutable> getCurrentData()
+ {
+ return currentData;
+ }
+
}
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1895-PR'
into devel-3
Posted by ti...@apache.org.
Merge branch 'MLHR-1895-PR' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/958471a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/958471a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/958471a4
Branch: refs/heads/devel-3
Commit: 958471a4dbd3b473e027602e28ede004d1c992c0
Parents: 40b0c42 d995754
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Nov 12 15:44:41 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Nov 12 15:44:41 2015 -0800
----------------------------------------------------------------------
.../snapshot/AbstractAppDataSnapshotServer.java | 132 +++++++++++++------
1 file changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------