You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/17 23:10:54 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1906 #resolve #comment
Snapshot Server support tags
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 658118a6f -> e0338537c
MLHR-1906 #resolve #comment Snapshot Server support tags
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3738eccb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3738eccb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3738eccb
Branch: refs/heads/devel-3
Commit: 3738eccbad5a7734ca98496c632a65dea40112e5
Parents: 7803359
Author: bright <br...@bright-mac.local>
Authored: Mon Nov 16 16:16:18 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Tue Nov 17 11:37:23 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/appdata/schemas/Schema.java | 3 +-
.../lib/appdata/schemas/SnapshotSchema.java | 24 +++-
.../snapshot/AbstractAppDataSnapshotServer.java | 38 +++++--
.../AppDataSnapshotServerTagsSupportTest.java | 111 +++++++++++++++++++
.../satisfactionRatingSnapshotSchema_test.json | 8 ++
...actionRatingWithTagsSnapshotSchema_test.json | 9 ++
6 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
index f4dd1a2..e2e21da 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
@@ -33,7 +33,8 @@ public interface Schema
public static final String FIELD_SCHEMA_KEYS = "schemaKeys";
public static final String FIELD_SCHEMA = "schema";
-
+ public static final String FIELD_SCHEMA_TAGS = "tags";
+
/**
* The id of the schema. This is relevant for operators which support serving multiple schemas,
* in which each schema will need a unique id.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
index c2c6eb2..aed9013 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.schemas;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -198,12 +199,6 @@ public class SnapshotSchema implements Schema
{
schema = new JSONObject(schemaJSON);
- Preconditions.checkState(schema.length() == NUM_KEYS_FIRST_LEVEL,
- "Expected "
- + NUM_KEYS_FIRST_LEVEL
- + " keys in the first level but found "
- + schema.length());
-
if(schemaKeys != null) {
schema.put(Schema.FIELD_SCHEMA_KEYS,
SchemaUtils.createJSONObject(schemaKeys));
@@ -246,6 +241,23 @@ public class SnapshotSchema implements Schema
schemaJSON = this.schema.toString();
}
+ public void setTags(Set<String> tags)
+ {
+ if (tags == null || tags.isEmpty())
+ throw new IllegalArgumentException("tags can't be null or empty.");
+
+ try {
+ JSONArray tagArray = new JSONArray(tags);
+
+ schema.put(FIELD_SCHEMA_TAGS, tagArray);
+ } catch (JSONException e) {
+ Preconditions.checkState(false, e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ schemaJSON = schema.toString();
+ }
+
/**
* This is a helper method which sets the JSON that represents this schema.
* @param schemaJSON The JSON that represents this schema.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index ded099b..236735f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.snapshot;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.validation.constraints.NotNull;
@@ -79,25 +80,25 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
/**
* The {@link MessageSerializerFactory} for the operator.
*/
- private transient MessageSerializerFactory resultSerializerFactory;
+ protected transient MessageSerializerFactory resultSerializerFactory;
/**
* The {@link SchemaRegistry} for the operator.
*/
- private transient SchemaRegistry schemaRegistry;
+ protected transient SchemaRegistry schemaRegistry;
/**
* The schema for the operator.
*/
protected transient SnapshotSchema schema;
@NotNull
- private ResultFormatter resultFormatter = new ResultFormatter();
- private String snapshotSchemaJSON;
+ protected ResultFormatter resultFormatter = new ResultFormatter();
+ protected String snapshotSchemaJSON;
/**
* The current data to be served by the operator.
*/
protected List<GPOMutable> currentData = Lists.newArrayList();
- private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
- private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
+ protected EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
+ protected final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
@AppData.ResultPort
public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
@@ -106,6 +107,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
* The queryExecutor execute the query and return the result.
*/
protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor;
+
+ private Set<String> tags;
@AppData.QueryPort
@InputPortFieldAnnotation(optional=true)
@@ -195,7 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void setup(OperatorContext context)
{
- schema = new SnapshotSchema(snapshotSchemaJSON);
+ setupSchema();
+
schemaRegistry = new SchemaRegistrySingle(schema);
//Setup for query processing
setupQueryProcessor();
@@ -214,7 +218,14 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
embeddableQueryInfoProvider.setup(context);
}
}
-
+
+ protected void setupSchema()
+ {
+ schema = new SnapshotSchema(snapshotSchemaJSON);
+ if (tags != null && !tags.isEmpty())
+ schema.setTags(tags);
+ }
+
protected void setupQueryProcessor()
{
queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor,
@@ -357,5 +368,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
{
return currentData;
}
+
+ public Set<String> getTags()
+ {
+ return tags;
+ }
+
+ public void setTags(Set<String> tags)
+ {
+ this.tags = tags;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
new file mode 100644
index 0000000..5ae882c
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.appdata.snapshot;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Schema;
+import com.datatorrent.lib.appdata.schemas.SchemaResult;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+
+public class AppDataSnapshotServerTagsSupportTest
+{
+ public static class AppDataSnapshotServerSchemaExport extends AbstractAppDataSnapshotServer<Object>
+ {
+ SchemaResult schemaResult;
+ String schemaResultJSON;
+
+ @Override
+ public GPOMutable convert(Object inputEvent)
+ {
+ return null;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ while ((schemaResult = schemaQueue.poll()) != null) {
+ schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
+ queryResult.emit(schemaResultJSON);
+ }
+
+ queryProcessor.endWindow();
+ }
+ }
+
+ private static final String schemaLocation = "satisfactionRatingSnapshotSchema_test.json";
+ private static final String TAG = "bulletin";
+
+ private static final String schemaWithTagsLocation = "satisfactionRatingWithTagsSnapshotSchema_test.json";
+
+ @Test
+ public void testSchema() throws Exception
+ {
+ AppDataSnapshotServerSchemaExport[] snapshotServers = getSnapshotServers();
+
+ for (AppDataSnapshotServerSchemaExport snapshotServer : snapshotServers) {
+ snapshotServer.setup(null);
+
+ snapshotServer.processQuery("{\"id\":123, \"type\":\"schemaQuery\"}");
+ snapshotServer.beginWindow(0L);
+ snapshotServer.endWindow();
+
+ String result = snapshotServer.schemaResultJSON;
+
+ JSONObject json = new JSONObject(result);
+ JSONObject jsonData = (JSONObject)json.getJSONArray("data").get(0);
+ JSONArray tags = jsonData.getJSONArray(Schema.FIELD_SCHEMA_TAGS);
+ Assert.assertTrue("No tags", tags != null);
+ Assert.assertEquals("Invalid tag.", tags.get(0), TAG);
+ }
+ }
+
+ protected AppDataSnapshotServerSchemaExport[] getSnapshotServers()
+ {
+ AppDataSnapshotServerSchemaExport[] snapshotServers = new AppDataSnapshotServerSchemaExport[2];
+
+ {
+ String schema = SchemaUtils.jarResourceFileToString(schemaLocation);
+
+ AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport();
+
+ snapshotServer.setSnapshotSchemaJSON(schema);
+ snapshotServer.setTags(Sets.newHashSet(TAG));
+
+ snapshotServers[0] = snapshotServer;
+ }
+ {
+ String schema = SchemaUtils.jarResourceFileToString(schemaWithTagsLocation);
+
+ AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport();
+
+ snapshotServer.setSnapshotSchemaJSON(schema);
+
+ snapshotServers[1] = snapshotServer;
+ }
+
+ return snapshotServers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
new file mode 100644
index 0000000..5733e6c
--- /dev/null
+++ b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
@@ -0,0 +1,8 @@
+{
+ "values": [
+ {"name": "current", "type": "long", "tags": ["current"]},
+ {"name": "min", "type": "long", "tags": ["min"]},
+ {"name": "max", "type": "long", "tags": ["max"]},
+ {"name": "threshold", "type": "long", "tags": ["threshold"]}
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
new file mode 100644
index 0000000..7c04db5
--- /dev/null
+++ b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
@@ -0,0 +1,9 @@
+{
+ "tags": ["bulletin"],
+ "values": [
+ {"name": "current", "type": "long", "tags": ["current"]},
+ {"name": "min", "type": "long", "tags": ["min"]},
+ {"name": "max", "type": "long", "tags": ["max"]},
+ {"name": "threshold", "type": "long", "tags": ["threshold"]}
+ ]
+}
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1906' into
devel-3
Posted by ti...@apache.org.
Merge branch 'MLHR-1906' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e0338537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e0338537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e0338537
Branch: refs/heads/devel-3
Commit: e0338537cd11204f66d86c470c883e999f1b8606
Parents: 658118a 3738ecc
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 17 13:51:20 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 17 13:51:20 2015 -0800
----------------------------------------------------------------------
.../datatorrent/lib/appdata/schemas/Schema.java | 3 +-
.../lib/appdata/schemas/SnapshotSchema.java | 24 +++-
.../snapshot/AbstractAppDataSnapshotServer.java | 38 +++++--
.../AppDataSnapshotServerTagsSupportTest.java | 111 +++++++++++++++++++
.../satisfactionRatingSnapshotSchema_test.json | 8 ++
...actionRatingWithTagsSnapshotSchema_test.json | 9 ++
6 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------