You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/05 14:46:45 UTC
[apex-malhar] branch master updated: APEXMALHAR-2530 Refactored
AbstractAppDataSnapshotServer so that subclasses don't need schemas
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 783b7fe APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses don't need schemas
783b7fe is described below
commit 783b7fe3a1d9dd9c3ef7803e049d7d317b2aefd1
Author: David Yan <da...@apache.org>
AuthorDate: Sat Jul 29 23:33:54 2017 -0700
APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses don't need schemas
---
.../snapshot/AbstractAppDataSnapshotServer.java | 35 +-----
.../lib/io/PubSubWebSocketAppDataQuery.java | 14 ---
.../lib/io/PubSubWebSocketAppDataResult.java | 16 +--
.../malhar/lib/appdata/AbstractAppDataServer.java | 125 +++++++++++++++++++++
4 files changed, 139 insertions(+), 51 deletions(-)
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index 19e142b..88d89c4 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -28,6 +28,7 @@ import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.appdata.AbstractAppDataServer;
import org.apache.commons.lang3.mutable.MutableLong;
import com.google.common.base.Preconditions;
@@ -36,11 +37,9 @@ import com.google.common.collect.Lists;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
-import com.datatorrent.lib.appdata.StoreUtils;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
import com.datatorrent.lib.appdata.query.QueryExecutor;
@@ -67,7 +66,7 @@ import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
* @param <INPUT_EVENT> The type of the input events that the operator accepts.
* @since 3.0.0
*/
-public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator, AppData.Store<String>
+public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> extends AbstractAppDataServer<String>
{
/**
* The {@link QueryManagerSynchronous} for the operator.
@@ -198,6 +197,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void setup(OperatorContext context)
{
+ super.setup(context);
setupSchema();
schemaRegistry = new SchemaRegistrySingle(schema);
@@ -209,13 +209,6 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
resultSerializerFactory = new MessageSerializerFactory(resultFormatter);
queryProcessor.setup(context);
-
- if (embeddableQueryInfoProvider != null) {
- embeddableQueryInfoProvider.enableEmbeddedMode();
- LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
- StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query);
- embeddableQueryInfoProvider.setup(context);
- }
}
protected void setupSchema()
@@ -235,19 +228,14 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void beginWindow(long windowId)
{
- if (embeddableQueryInfoProvider != null) {
- embeddableQueryInfoProvider.beginWindow(windowId);
- }
-
+ super.beginWindow(windowId);
queryProcessor.beginWindow(windowId);
}
@Override
public void endWindow()
{
- if (embeddableQueryInfoProvider != null) {
- embeddableQueryInfoProvider.endWindow();
- }
+ super.endWindow();
{
Result result;
@@ -275,21 +263,10 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void teardown()
{
- if (embeddableQueryInfoProvider != null) {
- embeddableQueryInfoProvider.teardown();
- }
-
+ super.teardown();
queryProcessor.teardown();
}
- @Override
- public void deactivate()
- {
- if (embeddableQueryInfoProvider != null) {
- embeddableQueryInfoProvider.deactivate();
- }
- }
-
/**
* Gets the JSON for the schema.
* @return the JSON for the schema.
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 7cf883f..3f2029e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
import javax.validation.constraints.Min;
-import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
@@ -158,19 +157,6 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
try {
JSONObject jo = new JSONObject(message);
- JSONArray ja = jo.names();
-
- //Make sure that only the correct keys are in the first level of JSON
- for (int keyIndex = 0; keyIndex < ja.length(); keyIndex++) {
- String key = ja.getString(keyIndex);
- if (!(PubSubMessage.DATA_KEY.equals(key) ||
- PubSubMessage.TOPIC_KEY.equals(key) ||
- PubSubMessage.TYPE_KEY.equals(key))) {
- logger.error("{} is not a valid key in the first level of the following pubsub message:\n{}", key, message);
- return null;
- }
- }
-
data = jo.getString(PubSubMessage.DATA_KEY);
} catch (JSONException e) {
return null;
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index e1f3fa1..7800fa4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -97,16 +97,16 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
throw new RuntimeException(ex);
}
- String id;
-
- try {
- id = jo.getString("id");
- } catch (JSONException ex) {
- throw new RuntimeException(ex);
+ String topic = getTopic();
+
+ if (jo.has("id")) {
+ try {
+ topic += "." + jo.getString("id");
+ } catch (JSONException ex) {
+ throw new RuntimeException(ex);
+ }
}
- String topic = getTopic() + "." + id;
-
JSONObject output = new JSONObject();
try {
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java
new file mode 100644
index 0000000..4b57d66
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.appdata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.lib.appdata.StoreUtils;
+
+/**
+ * This is an operator that lays the framework of serving data to queries coming from an embeddable query info provider,
+ * which may be an input operator.
+ * Subclasses are expected to implement the processQuery method for the logic of handling the query. Note that
+ * processQuery cannot directly emit to the operator's output port because it's called from the thread of the
+ * embeddable query info provider.
+ */
+public abstract class AbstractAppDataServer<QueryType> implements Operator, AppData.Store<QueryType>
+{
+ @AppData.QueryPort
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<QueryType> query = new DefaultInputPort<QueryType>()
+ {
+ @Override
+ public void process(QueryType query)
+ {
+ processQuery(query);
+ }
+ };
+
+
+ protected AppData.EmbeddableQueryInfoProvider<QueryType> embeddableQueryInfoProvider;
+
+
+ protected abstract void processQuery(QueryType query);
+
+ @Override
+ public void activate(Context.OperatorContext ctx)
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.activate(ctx);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.enableEmbeddedMode();
+ LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
+ StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query);
+ embeddableQueryInfoProvider.setup(context);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.beginWindow(windowId);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.endWindow();
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.teardown();
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.deactivate();
+ }
+ }
+
+ @Override
+ public AppData.EmbeddableQueryInfoProvider<QueryType> getEmbeddableQueryInfoProvider()
+ {
+ return embeddableQueryInfoProvider;
+ }
+
+ @Override
+ public void setEmbeddableQueryInfoProvider(AppData.EmbeddableQueryInfoProvider<QueryType> embeddableQueryInfoProvider)
+ {
+ this.embeddableQueryInfoProvider = Preconditions.checkNotNull(embeddableQueryInfoProvider);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAppDataServer.class);
+
+}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].