You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/17 23:10:54 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1906 #resolve #comment Snapshot Server support tags

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 658118a6f -> e0338537c


MLHR-1906 #resolve #comment Snapshot Server support tags


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3738eccb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3738eccb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3738eccb

Branch: refs/heads/devel-3
Commit: 3738eccbad5a7734ca98496c632a65dea40112e5
Parents: 7803359
Author: bright <br...@bright-mac.local>
Authored: Mon Nov 16 16:16:18 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Tue Nov 17 11:37:23 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/appdata/schemas/Schema.java |   3 +-
 .../lib/appdata/schemas/SnapshotSchema.java     |  24 +++-
 .../snapshot/AbstractAppDataSnapshotServer.java |  38 +++++--
 .../AppDataSnapshotServerTagsSupportTest.java   | 111 +++++++++++++++++++
 .../satisfactionRatingSnapshotSchema_test.json  |   8 ++
 ...actionRatingWithTagsSnapshotSchema_test.json |   9 ++
 6 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
index f4dd1a2..e2e21da 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java
@@ -33,7 +33,8 @@ public interface Schema
 
   public static final String FIELD_SCHEMA_KEYS = "schemaKeys";
   public static final String FIELD_SCHEMA = "schema";
-
+  public static final String FIELD_SCHEMA_TAGS = "tags";
+  
   /**
    * The id of the schema. This is relevant for operators which support serving multiple schemas,
    * in which each schema will need a unique id.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
index c2c6eb2..aed9013 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.schemas;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -198,12 +199,6 @@ public class SnapshotSchema implements Schema
   {
     schema = new JSONObject(schemaJSON);
 
-    Preconditions.checkState(schema.length() == NUM_KEYS_FIRST_LEVEL,
-                             "Expected "
-                             + NUM_KEYS_FIRST_LEVEL
-                             + " keys in the first level but found "
-                             + schema.length());
-
     if(schemaKeys != null) {
       schema.put(Schema.FIELD_SCHEMA_KEYS,
                  SchemaUtils.createJSONObject(schemaKeys));
@@ -246,6 +241,23 @@ public class SnapshotSchema implements Schema
     schemaJSON = this.schema.toString();
   }
 
+  public void setTags(Set<String> tags)
+  {
+    if (tags == null || tags.isEmpty())
+      throw new IllegalArgumentException("tags can't be null or empty.");
+
+    try {
+      JSONArray tagArray = new JSONArray(tags);
+
+      schema.put(FIELD_SCHEMA_TAGS, tagArray);
+    } catch (JSONException e) {
+      Preconditions.checkState(false, e.getMessage());
+      throw new RuntimeException(e);
+    }
+
+    schemaJSON = schema.toString();
+  }
+  
   /**
    * This is a helper method which sets the JSON that represents this schema.
    * @param schemaJSON The JSON that represents this schema.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index ded099b..236735f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.snapshot;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.validation.constraints.NotNull;
@@ -79,25 +80,25 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   /**
    * The {@link MessageSerializerFactory} for the operator.
    */
-  private transient MessageSerializerFactory resultSerializerFactory;
+  protected transient MessageSerializerFactory resultSerializerFactory;
   /**
    * The {@link SchemaRegistry} for the operator.
    */
-  private transient SchemaRegistry schemaRegistry;
+  protected transient SchemaRegistry schemaRegistry;
   /**
    * The schema for the operator.
    */
   protected transient SnapshotSchema schema;
 
   @NotNull
-  private ResultFormatter resultFormatter = new ResultFormatter();
-  private String snapshotSchemaJSON;
+  protected ResultFormatter resultFormatter = new ResultFormatter();
+  protected String snapshotSchemaJSON;
   /**
    * The current data to be served by the operator.
    */
   protected List<GPOMutable> currentData = Lists.newArrayList();
-  private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
-  private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
+  protected EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
+  protected final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
 
   @AppData.ResultPort
   public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
@@ -106,6 +107,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    * The queryExecutor execute the query and return the result.
    */
   protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor;
+   
+  private Set<String> tags;
   
   @AppData.QueryPort
   @InputPortFieldAnnotation(optional=true)
@@ -195,7 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void setup(OperatorContext context)
   {
-    schema = new SnapshotSchema(snapshotSchemaJSON);
+    setupSchema();
+    
     schemaRegistry = new SchemaRegistrySingle(schema);
     //Setup for query processing
     setupQueryProcessor();
@@ -214,7 +218,14 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
       embeddableQueryInfoProvider.setup(context);
     }
   }
-  
+
+  protected void setupSchema()
+  {
+    schema = new SnapshotSchema(snapshotSchemaJSON);
+    if (tags != null && !tags.isEmpty())
+      schema.setTags(tags);
+  }
+
   protected void setupQueryProcessor()
   {
     queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, 
@@ -357,5 +368,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   {
     return currentData;
   }
+
+  public Set<String> getTags()
+  {
+    return tags;
+  }
+
+  public void setTags(Set<String> tags)
+  {
+    this.tags = tags;
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
new file mode 100644
index 0000000..5ae882c
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.appdata.snapshot;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Schema;
+import com.datatorrent.lib.appdata.schemas.SchemaResult;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+
+public class AppDataSnapshotServerTagsSupportTest
+{
+  public static class AppDataSnapshotServerSchemaExport extends AbstractAppDataSnapshotServer<Object>
+  {
+    SchemaResult schemaResult;
+    String schemaResultJSON;
+
+    @Override
+    public GPOMutable convert(Object inputEvent)
+    {
+      return null;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      while ((schemaResult = schemaQueue.poll()) != null) {
+        schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
+        queryResult.emit(schemaResultJSON);
+      }
+
+      queryProcessor.endWindow();
+    }
+  }
+
+  private static final String schemaLocation = "satisfactionRatingSnapshotSchema_test.json";
+  private static final String TAG = "bulletin";
+
+  private static final String schemaWithTagsLocation = "satisfactionRatingWithTagsSnapshotSchema_test.json";
+
+  @Test
+  public void testSchema() throws Exception
+  {
+    AppDataSnapshotServerSchemaExport[] snapshotServers = getSnapshotServers();
+
+    for (AppDataSnapshotServerSchemaExport snapshotServer : snapshotServers) {
+      snapshotServer.setup(null);
+
+      snapshotServer.processQuery("{\"id\":123, \"type\":\"schemaQuery\"}");
+      snapshotServer.beginWindow(0L);
+      snapshotServer.endWindow();
+
+      String result = snapshotServer.schemaResultJSON;
+
+      JSONObject json = new JSONObject(result);
+      JSONObject jsonData = (JSONObject)json.getJSONArray("data").get(0);
+      JSONArray tags = jsonData.getJSONArray(Schema.FIELD_SCHEMA_TAGS);
+      Assert.assertTrue("No tags", tags != null);
+      Assert.assertEquals("Invalid tag.", tags.get(0), TAG);
+    }
+  }
+
+  protected AppDataSnapshotServerSchemaExport[] getSnapshotServers()
+  {
+    AppDataSnapshotServerSchemaExport[] snapshotServers = new AppDataSnapshotServerSchemaExport[2];
+
+    {
+      String schema = SchemaUtils.jarResourceFileToString(schemaLocation);
+
+      AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport();
+
+      snapshotServer.setSnapshotSchemaJSON(schema);
+      snapshotServer.setTags(Sets.newHashSet(TAG));
+
+      snapshotServers[0] = snapshotServer;
+    }
+    {
+      String schema = SchemaUtils.jarResourceFileToString(schemaWithTagsLocation);
+
+      AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport();
+
+      snapshotServer.setSnapshotSchemaJSON(schema);
+
+      snapshotServers[1] = snapshotServer;
+    }
+
+    return snapshotServers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
new file mode 100644
index 0000000..5733e6c
--- /dev/null
+++ b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json
@@ -0,0 +1,8 @@
+{
+  "values": [
+    {"name": "current", "type": "long", "tags": ["current"]}, 
+    {"name": "min", "type": "long", "tags": ["min"]}, 
+    {"name": "max", "type": "long", "tags": ["max"]}, 
+    {"name": "threshold", "type": "long", "tags": ["threshold"]}
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
new file mode 100644
index 0000000..7c04db5
--- /dev/null
+++ b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json
@@ -0,0 +1,9 @@
+{
+  "tags": ["bulletin"],
+  "values": [
+    {"name": "current", "type": "long", "tags": ["current"]}, 
+    {"name": "min", "type": "long", "tags": ["min"]}, 
+    {"name": "max", "type": "long", "tags": ["max"]}, 
+    {"name": "threshold", "type": "long", "tags": ["threshold"]}
+  ]
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1906' into devel-3

Posted by ti...@apache.org.
Merge branch 'MLHR-1906' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e0338537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e0338537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e0338537

Branch: refs/heads/devel-3
Commit: e0338537cd11204f66d86c470c883e999f1b8606
Parents: 658118a 3738ecc
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 17 13:51:20 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 17 13:51:20 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/appdata/schemas/Schema.java |   3 +-
 .../lib/appdata/schemas/SnapshotSchema.java     |  24 +++-
 .../snapshot/AbstractAppDataSnapshotServer.java |  38 +++++--
 .../AppDataSnapshotServerTagsSupportTest.java   | 111 +++++++++++++++++++
 .../satisfactionRatingSnapshotSchema_test.json  |   8 ++
 ...actionRatingWithTagsSnapshotSchema_test.json |   9 ++
 6 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------