You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/12 23:10:50 UTC

[1/3] samza git commit: Revert "SAMZA-484; define serialization for tuples in samza-sql"

Repository: samza
Updated Branches:
  refs/heads/master 8090d6539 -> 6743df319


Revert "SAMZA-484; define serialization for tuples in samza-sql"

This reverts commit eedf2e7204fc01e32bd21c454ff83a36a23f6105.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f44a6929
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f44a6929
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f44a6929

Branch: refs/heads/master
Commit: f44a692914b3005cd85acd64a52be6415ae49c59
Parents: 8090d65
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Feb 12 14:10:33 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 12 14:10:33 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  10 +-
 gradle/dependency-versions.gradle               |   1 -
 samza-sql/README                                |   1 +
 samza-sql/README.md                             |   1 -
 .../org/apache/samza/sql/api/data/Data.java     |  54 ----
 .../org/apache/samza/sql/api/data/Schema.java   |  55 ----
 .../org/apache/samza/sql/api/data/Tuple.java    |   4 +-
 .../samza/sql/data/IncomingMessageTuple.java    |   9 +-
 .../apache/samza/sql/data/avro/AvroData.java    | 262 ----------------
 .../apache/samza/sql/data/avro/AvroSchema.java  | 296 -------------------
 .../sql/data/serializers/SqlStringSerde.java    |  45 ---
 .../data/serializers/SqlStringSerdeFactory.java |  33 ---
 .../samza/sql/data/string/StringData.java       | 101 -------
 .../samza/sql/data/string/StringSchema.java     |  73 -----
 .../sql/operators/partition/PartitionOp.java    |   5 +-
 15 files changed, 18 insertions(+), 932 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b49c313..e6b10fc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ rat {
     'gradlew',
     'gradlew.bat',
     'samza-test/state/mystore/**',
-    '**/README.md',
+    'README.md',
     'RELEASE.md',
   ]
 }
@@ -249,12 +249,18 @@ project(":samza-yarn_$scalaVersion") {
 project(":samza-sql_$scalaVersion") {
   apply plugin: 'java'
 
+  configurations {
+    // Remove transitive dependencies from Zookeeper that we don't want.
+    compile.exclude group: 'javax.jms', module: 'jms'
+    compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
+    compile.exclude group: 'com.sun.jmx', module: 'jmxri'
+  }
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile project(":samza-kv_$scalaVersion")
     compile "commons-collections:commons-collections:$commonsCollectionVersion"
-    compile "org.apache.avro:avro:$avroVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 03c72f8..6f815b2 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -35,5 +35,4 @@
   guavaVersion = "17.0"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
-  avroVersion = "1.7.7"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
new file mode 100644
index 0000000..65b7558
--- /dev/null
+++ b/samza-sql/README
@@ -0,0 +1 @@
+samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README.md
----------------------------------------------------------------------
diff --git a/samza-sql/README.md b/samza-sql/README.md
deleted file mode 100644
index 598670b..0000000
--- a/samza-sql/README.md
+++ /dev/null
@@ -1 +0,0 @@
-samza-sql is an experimental module that is under development (SAMZA-390).

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
deleted file mode 100644
index d1b8409..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.List;
-import java.util.Map;
-
-
-public interface Data {
-
-  Schema schema();
-
-  Object value();
-
-  int intValue();
-
-  long longValue();
-
-  float floatValue();
-
-  double doubleValue();
-
-  boolean booleanValue();
-
-  String strValue();
-
-  byte[] bytesValue();
-
-  List<Object> arrayValue();
-
-  Map<Object, Object> mapValue();
-
-  Data getElement(int index);
-
-  Data getFieldData(String fldName);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
deleted file mode 100644
index 1e8f192..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.Map;
-
-
-public interface Schema {
-
-  enum Type {
-    INTEGER,
-    LONG,
-    FLOAT,
-    DOUBLE,
-    BOOLEAN,
-    STRING,
-    BYTES,
-    STRUCT,
-    ARRAY,
-    MAP
-  };
-
-  Type getType();
-
-  Schema getElementType();
-
-  Schema getValueType();
-
-  Map<String, Schema> getFields();
-
-  Schema getFieldType(String fldName);
-
-  Data read(Object object);
-
-  Data transform(Data inputData);
-
-  boolean equals(Schema other);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index bc8efcf..0c21a53 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -32,7 +32,7 @@ public interface Tuple {
    *
    * @return Message object in the tuple
    */
-  Data getMessage();
+  Object getMessage();
 
   /**
    * Method to indicate whether the tuple is a delete tuple or an insert tuple
@@ -46,7 +46,7 @@ public interface Tuple {
    *
    * @return The <code>key</code> of the tuple
    */
-  Data getKey();
+  Object getKey();
 
   /**
    * Get the stream name of the tuple. Note this stream name should be unique in the system.

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index f868e5c..a8a55e2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.sql.data;
 
-import org.apache.samza.sql.api.data.Data;
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -53,8 +52,8 @@ public class IncomingMessageTuple implements Tuple {
 
   // TODO: the return type should be changed to the generic data type
   @Override
-  public Data getMessage() {
-    return (Data) this.imsg.getMessage();
+  public Object getMessage() {
+    return this.imsg.getMessage();
   }
 
   @Override
@@ -63,8 +62,8 @@ public class IncomingMessageTuple implements Tuple {
   }
 
   @Override
-  public Data getKey() {
-    return (Data) this.imsg.getKey();
+  public Object getKey() {
+    return imsg.getKey();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
deleted file mode 100644
index d040be9..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroData implements Data {
-  protected final Object datum;
-  protected final AvroSchema schema;
-
-  private AvroData(AvroSchema schema, Object datum) {
-    this.datum = datum;
-    this.schema = schema;
-  }
-
-  @Override
-  public Schema schema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object value() {
-    return this.datum;
-  }
-
-  @Override
-  public int intValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public long longValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public float floatValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public double doubleValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public boolean booleanValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public String strValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public byte[] bytesValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public List<Object> arrayValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Map<Object, Object> mapValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getElement(int index) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  @Override
-  public Data getFieldData(String fldName) {
-    throw new UnsupportedOperationException("Can't get value for an unknown data type.");
-  }
-
-  public static AvroData getArray(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.ARRAY) {
-      throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
-
-      @Override
-      public List<Object> arrayValue() {
-        return this.array;
-      }
-
-      @Override
-      public Data getElement(int index) {
-        return this.schema.getElementType().read(array.get(index));
-      }
-
-    };
-  }
-
-  public static AvroData getMap(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.MAP) {
-      throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final Map<Object, Object> map = (Map<Object, Object>) datum;
-
-      @Override
-      public Map<Object, Object> mapValue() {
-        return this.map;
-      }
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getValueType().read(map.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getStruct(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      private final GenericRecord record = (GenericRecord) datum;
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getFieldType(fldName).read(record.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getInt(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public int intValue() {
-        return ((Integer) datum).intValue();
-      }
-
-    };
-  }
-
-  public static AvroData getLong(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public long longValue() {
-        return ((Long) datum).longValue();
-      }
-
-    };
-  }
-
-  public static AvroData getFloat(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public float floatValue() {
-        return ((Float) datum).floatValue();
-      }
-
-    };
-  }
-
-  public static AvroData getDouble(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public double doubleValue() {
-        return ((Double) datum).doubleValue();
-      }
-
-    };
-  }
-
-  public static AvroData getBoolean(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public boolean booleanValue() {
-        return ((Boolean) datum).booleanValue();
-      }
-
-    };
-  }
-
-  public static AvroData getString(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public String strValue() {
-        return ((CharSequence) datum).toString();
-      }
-
-    };
-  }
-
-  public static AvroData getBytes(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public byte[] bytesValue() {
-        return ((ByteBuffer) datum).array();
-      }
-
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
deleted file mode 100644
index 577cf74..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.Schema.Field;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroSchema implements Schema {
-
-  protected final org.apache.avro.Schema avroSchema;
-  protected final Schema.Type type;
-
-  private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
-      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
-
-  static {
-    primSchemas.put(org.apache.avro.Schema.Type.INT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getInt(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.LONG,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getLong(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getFloat(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getDouble(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBoolean(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.STRING,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getString(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
-        new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBytes(this, datum);
-      }
-    });
-  };
-
-  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
-    Schema.Type type = mapType(schema.getType());
-    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
-      return primSchemas.get(schema.getType());
-    }
-    // otherwise, construct the new schema
-    // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
-    switch (type) {
-      case ARRAY:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.ARRAY) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getElementType().equals(this.getElementType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getElementType().getType());
-            }
-            // input type matches array type
-            return AvroData.getArray(this, input.value());
-          }
-        };
-      case MAP:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.MAP) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getValueType().equals(this.getValueType())) {
-              throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getValueType().getType());
-            }
-            // input type matches map type
-            return AvroData.getMap(this, input.value());
-          }
-        };
-      case STRUCT:
-        return new AvroSchema(schema) {
-          @SuppressWarnings("serial")
-          private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
-            {
-              for (Field field : schema.getFields()) {
-                put(field.name(), getSchema(field.schema()));
-              }
-            }
-          };
-
-          @Override
-          public Map<String, Schema> getFields() {
-            return this.fldSchemas;
-          }
-
-          @Override
-          public Schema getFieldType(String fldName) {
-            return this.fldSchemas.get(fldName);
-          }
-
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current schema's array length
-            if (input.schema().getType() != Schema.Type.STRUCT) {
-              throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            // Note: this particular transform function only implements "projection to a sub-set" concept.
-            // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
-            for (String fldName : this.fldSchemas.keySet()) {
-              // check each field schema matches input
-              Schema fldSchema = this.fldSchemas.get(fldName);
-              Schema inputFld = input.schema().getFieldType(fldName);
-              if (!fldSchema.equals(inputFld)) {
-                throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
-                    + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
-              }
-            }
-            // input type matches struct type
-            return AvroData.getStruct(this, input.value());
-          }
-
-        };
-      default:
-        throw new IllegalArgumentException("Un-recognized complext data type:" + type);
-    }
-  }
-
-  private AvroSchema(org.apache.avro.Schema schema) {
-    this.avroSchema = schema;
-    this.type = mapType(schema.getType());
-  }
-
-  private static Type mapType(org.apache.avro.Schema.Type type) {
-    switch (type) {
-      case ARRAY:
-        return Schema.Type.ARRAY;
-      case RECORD:
-        return Schema.Type.STRUCT;
-      case MAP:
-        return Schema.Type.MAP;
-      case INT:
-        return Schema.Type.INTEGER;
-      case LONG:
-        return Schema.Type.LONG;
-      case BOOLEAN:
-        return Schema.Type.BOOLEAN;
-      case FLOAT:
-        return Schema.Type.FLOAT;
-      case DOUBLE:
-        return Schema.Type.DOUBLE;
-      case STRING:
-        return Schema.Type.STRING;
-      case BYTES:
-        return Schema.Type.BYTES;
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
-    }
-  }
-
-  @Override
-  public Type getType() {
-    return this.type;
-  }
-
-  @Override
-  public Schema getElementType() {
-    if (this.type != Schema.Type.ARRAY) {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getElementType());
-  }
-
-  @Override
-  public Schema getValueType() {
-    if (this.type != Schema.Type.MAP) {
-      throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getValueType());
-  }
-
-  @Override
-  public Map<String, Schema> getFields() {
-    throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-  }
-
-  @Override
-  public Schema getFieldType(String fldName) {
-    throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-  }
-
-  @Override
-  public Data read(Object object) {
-    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
-      return AvroData.getArray(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
-      return AvroData.getMap(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
-      return AvroData.getStruct(this, object);
-    }
-    throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
-  }
-
-  @Override
-  public Data transform(Data inputData) {
-    if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
-        || inputData.schema().getType() == Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
-    }
-    if (inputData.schema().getType() != this.type) {
-      throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-          + ", input type:" + inputData.schema().getType());
-    }
-    return inputData;
-  }
-
-  @Override
-  public boolean equals(Schema other) {
-    // TODO Auto-generated method stub
-    if (this.type != other.getType()) {
-      return false;
-    }
-    switch (this.type) {
-      case ARRAY:
-        // check if element types are the same
-        return this.getElementType().equals(other.getElementType());
-      case MAP:
-        // check if value types are the same
-        return this.getValueType().equals(other.getValueType());
-      case STRUCT:
-        // check if the fields schemas in this equals the other
-        // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
-        for (String fieldName : this.getFields().keySet()) {
-          if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
-            return false;
-          }
-        }
-        return true;
-      default:
-        return true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
deleted file mode 100644
index 1f0c3b2..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.sql.data.string.StringData;
-
-import java.io.UnsupportedEncodingException;
-
-public class SqlStringSerde implements Serde<StringData> {
-
-    private final Serde<String> serde;
-
-    public SqlStringSerde(String encoding) {
-        this.serde = new StringSerde(encoding);
-    }
-
-    @Override
-    public StringData fromBytes(byte[] bytes) {
-          return new StringData(serde.fromBytes(bytes));
-    }
-
-    @Override
-    public byte[] toBytes(StringData object) {
-        return serde.toBytes(object.strValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
deleted file mode 100644
index 2564479..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.string.StringData;
-
-public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
-    @Override
-    public Serde<StringData> getSerde(String name, Config config) {
-        return new SqlStringSerde(config.get("encoding", "UTF-8"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
deleted file mode 100644
index b81d9fa..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.List;
-import java.util.Map;
-
-public class StringData implements Data {
-    private final Object datum;
-    private final Schema schema;
-
-    public StringData(Object datum) {
-        this.datum = datum;
-        this.schema = new StringSchema();
-    }
-
-    @Override
-    public Schema schema() {
-        return this.schema;
-    }
-
-    @Override
-    public Object value() {
-        return this.datum;
-    }
-
-    @Override
-    public int intValue() {
-        throw new UnsupportedOperationException("Can't get int value for a string type data");
-    }
-
-    @Override
-    public long longValue() {
-        throw new UnsupportedOperationException("Can't get long value for a string type data");
-    }
-
-    @Override
-    public float floatValue() {
-        throw new UnsupportedOperationException("Can't get float value for a string type data");
-    }
-
-    @Override
-    public double doubleValue() {
-        throw new UnsupportedOperationException("Can't get double value for a string type data");
-    }
-
-    @Override
-    public boolean booleanValue() {
-        throw new UnsupportedOperationException("Can't get boolean value for a string type data");
-    }
-
-    @Override
-    public String strValue() {
-        return String.valueOf(datum);
-    }
-
-    @Override
-    public byte[] bytesValue() {
-        throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
-    }
-
-    @Override
-    public List<Object> arrayValue() {
-        throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
-    }
-
-    @Override
-    public Map<Object, Object> mapValue() {
-        throw new UnsupportedOperationException("Can't get mapValue for a string type data");
-    }
-
-    @Override
-    public Data getElement(int index) {
-        throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
-    }
-
-    @Override
-    public Data getFieldData(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
deleted file mode 100644
index 348fc0c..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.Map;
-
-public class StringSchema implements Schema {
-    private Type type = Type.STRING;
-
-    @Override
-    public Type getType() {
-      return Type.STRING;
-    }
-
-    @Override
-    public Schema getElementType() {
-      throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
-    }
-
-    @Override
-    public Schema getValueType() {
-        throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
-    }
-
-    @Override
-    public Map<String, Schema> getFields() {
-        throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
-    }
-
-    @Override
-    public Schema getFieldType(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
-    }
-
-    @Override
-    public Data read(Object object) {
-        return new StringData(object);
-    }
-
-    @Override
-    public Data transform(Data inputData) {
-        if (inputData.schema().getType() != this.type) {
-            throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
-                    + ", input type:" + inputData.schema().getType());
-        }
-        return inputData;
-    }
-
-    @Override
-    public boolean equals(Schema other) {
-        return other.getType() == this.type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 986d688..7921d4f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -82,8 +82,9 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
 
   @Override
   public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
-        tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
+    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
+        null /* TODO: when merge with Schema API changes, use: tuple
+             .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage()));
   }
 
 }


[2/3] samza git commit: Revert "SAMZA-482; create samza-sql module, and add a basic set of non-functional operators into it"

Posted by cr...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
deleted file mode 100644
index b4b0e59..0000000
--- a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-
-
-/**
- * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators
- *
- */
-public class StoreMessageCollector implements SqlMessageCollector {
-
-  private final KeyValueStore<EntityName, List<Object>> outputStore;
-
-  public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
-    this.outputStore = store;
-  }
-
-  @Override
-  public void send(Relation deltaRelation) throws Exception {
-    saveOutput(deltaRelation.getName(), deltaRelation);
-  }
-
-  @Override
-  public void send(Tuple tuple) throws Exception {
-    saveOutput(tuple.getStreamName(), tuple);
-  }
-
-  @Override
-  public void timeout(List<EntityName> outputs) throws Exception {
-    // TODO Auto-generated method stub
-  }
-
-  public List<Object> removeOutput(EntityName id) {
-    List<Object> output = outputStore.get(id);
-    outputStore.delete(id);
-    return output;
-  }
-
-  private void saveOutput(EntityName name, Object output) {
-    if (this.outputStore.get(name) == null) {
-      this.outputStore.put(name, new ArrayList<Object>());
-    }
-    List<Object> outputs = this.outputStore.get(name);
-    outputs.add(output);
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    saveOutput(
-        EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()),
-        envelope);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
deleted file mode 100644
index 4ec7dbb..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the following operations:
- * <p>a. the two streams are each processed by a window operator to convert to relations
- * <p>b. a join operator is applied on the two relations to generate join results
- * <p>c. finally, the join results are sent out to the system output
- *
- */
-public class RandomOperatorTask implements StreamTask, InitableTask, WindowableTask {
-  private KeyValueStore<EntityName, List<Object>> opOutputStore;
-  private BoundedTimeWindow wndOp1;
-  private BoundedTimeWindow wndOp2;
-  private Join joinOp;
-
-  private BoundedTimeWindow getWindowOp(EntityName streamName) {
-    if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) {
-      return this.wndOp1;
-    } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) {
-      return this.wndOp2;
-    }
-
-    throw new IllegalArgumentException("No window operator found for stream: " + streamName);
-  }
-
-  private void processJoinOutput(List<Object> outputs, MessageCollector collector) {
-    // get each tuple in the join operator's outputs and send it to system stream
-    for (Object joinOutput : outputs) {
-      for (KeyValueIterator<Object, Tuple> iter = ((Relation) joinOutput).all(); iter.hasNext();) {
-        Tuple otuple = iter.next().getValue();
-        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "joinOutput1"), otuple.getKey(), otuple
-            .getMessage()));
-      }
-    }
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // construct the input tuple
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-
-    // based on tuple's stream name, get the window op and run process()
-    BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName());
-    wndOp.process(ituple, sqlCollector);
-    List<Object> wndOutputs = sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0));
-    if (wndOutputs.isEmpty()) {
-      return;
-    }
-
-    // process all output from the window operator
-    for (Object input : wndOutputs) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // trigger timeout event on both window operators
-    this.wndOp1.window(sqlCollector, coordinator);
-    this.wndOp2.window(sqlCollector, coordinator);
-
-    // for all outputs from the window operators, call joinOp.process()
-    for (Object input : sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    for (Object input : sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // 1. create a fixed length 10 sec window operator
-    this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1");
-    this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", "relation2");
-    // 2. create a join operation
-    List<String> inputRelations = new ArrayList<String>();
-    inputRelations.add("relation1");
-    inputRelations.add("relation2");
-    List<String> joinKeys = new ArrayList<String>();
-    joinKeys.add("key1");
-    joinKeys.add("key2");
-    this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys);
-    // Finally, initialize all operators
-    this.opOutputStore =
-        (KeyValueStore<EntityName, List<Object>>) context.getStore("samza-sql-operator-output-kvstore");
-    this.wndOp1.init(config, context);
-    this.wndOp2.init(config, context);
-    this.joinOp.init(config, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
deleted file mode 100644
index 4796fa6..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-import org.apache.samza.sql.router.SimpleRouter;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the folowing operations:
- * <ul>
- * <li>a. the two streams are each processed by a window operator to convert to relations
- * <li>b. a join operator is applied on the two relations to generate join results
- * <li>c. an istream operator is applied on join output and convert the relation into a stream
- * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output
- * </ul>
- *
- * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
- * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together.
- */
-public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
-
-  private OperatorRouter rteCntx;
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-    for (Iterator<TupleOperator> iter = this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter
-        .hasNext();) {
-      iter.next().process(ituple, opCollector);
-    }
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    for (EntityName entity : this.rteCntx.getSystemInputs()) {
-      for (Iterator<Operator> iter = this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) {
-        iter.next().window(opCollector, coordinator);
-      }
-    }
-
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // create specification of all operators first
-    // 1. create 2 window specifications that define 2 windows of fixed length of 10 seconds
-    final WindowSpec spec1 =
-        new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"),
-            EntityName.getRelationName("fixedWndOutput1"), 10);
-    final WindowSpec spec2 =
-        new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"),
-            EntityName.getRelationName("fixedWndOutput2"), 10);
-    // 2. create a join specification that join the output from 2 window operators together
-    @SuppressWarnings("serial")
-    List<EntityName> inputRelations = new ArrayList<EntityName>() {
-      {
-        add(spec1.getOutputName());
-        add(spec2.getOutputName());
-      }
-    };
-    @SuppressWarnings("serial")
-    List<String> joinKeys = new ArrayList<String>() {
-      {
-        add("key1");
-        add("key2");
-      }
-    };
-    JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, EntityName.getRelationName("joinOutput"), joinKeys);
-    // 3. create the specification of an istream operator that convert the output from join to a stream
-    InsertStreamSpec istrmSpec =
-        new InsertStreamSpec("istremOp", joinSpec.getOutputName(), EntityName.getStreamName("istrmOutput1"));
-    // 4. create the specification of a partition operator that re-partitions the stream based on <code>joinKey</code>
-    PartitionSpec parSpec =
-        new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new SystemStream("kafka", "parOutputStrm1"),
-            "joinKey", 50);
-
-    // create all operators via the operator factory
-    // 1. create two window operators
-    SimpleOperatorFactoryImpl operatorFactory = new SimpleOperatorFactoryImpl();
-    BoundedTimeWindow wnd1 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec1);
-    BoundedTimeWindow wnd2 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec2);
-    // 2. create one join operator
-    Join join = (Join) operatorFactory.getRelationOperator(joinSpec);
-    // 3. create one stream operator
-    InsertStream istream = (InsertStream) operatorFactory.getRelationOperator(istrmSpec);
-    // 4. create a re-partition operator
-    PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec);
-
-    // Now, connecting the operators via the OperatorRouter
-    this.rteCntx = new SimpleRouter();
-    // 1. set two system input operators (i.e. two window operators)
-    this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1);
-    this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2);
-    // 2. connect join operator to both window operators
-    this.rteCntx.addRelationOperator(spec1.getOutputName(), join);
-    this.rteCntx.addRelationOperator(spec2.getOutputName(), join);
-    // 3. connect stream operator to the join operator
-    this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream);
-    // 4. connect re-partition operator to the stream operator
-    this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par);
-    // 5. set the system inputs
-    this.rteCntx.addSystemInput(spec1.getInputName());
-    this.rteCntx.addSystemInput(spec2.getInputName());
-
-    for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) {
-      iter.next().init(config, context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 08e548c..bb07a3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -26,8 +26,7 @@ include \
   'samza-log4j',
   'samza-shell',
   'samza-yarn',
-  'samza-test',
-  'samza-sql'
+  'samza-test'
 
 rootProject.children.each {
   if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {


[3/3] samza git commit: Revert "SAMZA-482; create samza-sql module, and add a basic set of non-functional operators into it"

Posted by cr...@apache.org.
Revert "SAMZA-482; create samza-sql module, and add a basic set of non-functional operators into it"

This reverts commit efb9795a2d5c0a9ea6c7957e3909c79ad94e39b0.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6743df31
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6743df31
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6743df31

Branch: refs/heads/master
Commit: 6743df319fa88bcd3a900a87de94536ba9a6a7d3
Parents: f44a692
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Feb 12 14:10:38 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Feb 12 14:10:38 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  20 ---
 gradle/dependency-versions.gradle               |   1 -
 samza-sql/README                                |   1 -
 .../apache/samza/sql/api/data/EntityName.java   | 141 ----------------
 .../org/apache/samza/sql/api/data/Relation.java |  47 ------
 .../org/apache/samza/sql/api/data/Tuple.java    |  58 -------
 .../samza/sql/api/operators/Operator.java       |  43 -----
 .../sql/api/operators/RelationOperator.java     |  51 ------
 .../sql/api/operators/SqlOperatorFactory.java   |  51 ------
 .../samza/sql/api/operators/TupleOperator.java  |  47 ------
 .../sql/api/operators/spec/OperatorSpec.java    |  64 --------
 .../samza/sql/api/router/OperatorRouter.java    | 126 ---------------
 .../samza/sql/data/IncomingMessageTuple.java    |  74 ---------
 .../sql/operators/factory/SimpleOperator.java   |  50 ------
 .../factory/SimpleOperatorFactoryImpl.java      |  63 --------
 .../operators/factory/SimpleOperatorSpec.java   | 106 -------------
 .../sql/operators/partition/PartitionOp.java    |  90 -----------
 .../sql/operators/partition/PartitionSpec.java  |  91 -----------
 .../samza/sql/operators/relation/Join.java      | 139 ----------------
 .../samza/sql/operators/relation/JoinSpec.java  |  60 -------
 .../sql/operators/stream/InsertStream.java      |  98 ------------
 .../sql/operators/stream/InsertStreamSpec.java  |  42 -----
 .../sql/operators/window/BoundedTimeWindow.java | 141 ----------------
 .../samza/sql/operators/window/WindowSpec.java  |  67 --------
 .../samza/sql/operators/window/WindowState.java |  44 -----
 .../apache/samza/sql/router/SimpleRouter.java   | 133 ----------------
 .../task/sql/OperatorMessageCollector.java      |  80 ----------
 .../samza/task/sql/SqlMessageCollector.java     |  64 --------
 .../samza/task/sql/StoreMessageCollector.java   |  80 ----------
 .../samza/task/sql/RandomOperatorTask.java      | 151 ------------------
 .../apache/samza/task/sql/StreamSqlTask.java    | 159 -------------------
 settings.gradle                                 |   3 +-
 32 files changed, 1 insertion(+), 2384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e6b10fc..b803276 100644
--- a/build.gradle
+++ b/build.gradle
@@ -246,26 +246,6 @@ project(":samza-yarn_$scalaVersion") {
   jar.dependsOn("lesscss")
 }
 
-project(":samza-sql_$scalaVersion") {
-  apply plugin: 'java'
-
-  configurations {
-    // Remove transitive dependencies from Zookeeper that we don't want.
-    compile.exclude group: 'javax.jms', module: 'jms'
-    compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
-    compile.exclude group: 'com.sun.jmx', module: 'jmxri'
-  }
-
-  dependencies {
-    compile project(':samza-api')
-    compile project(":samza-core_$scalaVersion")
-    compile project(":samza-kv_$scalaVersion")
-    compile "commons-collections:commons-collections:$commonsCollectionVersion"
-    testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
-  }
-}
-
 project(":samza-shell") {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 6f815b2..84be50b 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -34,5 +34,4 @@
   log4jVersion = "1.2.17"
   guavaVersion = "17.0"
   commonsCodecVersion = "1.9"
-  commonsCollectionVersion = "3.2.1"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
deleted file mode 100644
index 65b7558..0000000
--- a/samza-sql/README
+++ /dev/null
@@ -1 +0,0 @@
-samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
deleted file mode 100644
index 127a677..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/EntityName.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. relations and streams.
- */
-public class EntityName {
-  /**
-   * <code>EntityType</code> defines the types of the entity names
-   *
-   */
-  private enum EntityType {
-    RELATION,
-    STREAM
-  };
-
-  /**
-   * Type of the entity name
-   */
-  private final EntityType type;
-
-  /**
-   * Formatted name of the entity.
-   *
-   * <p>This formatted name of the entity should be unique identifier for the corresponding relation/stream in the system.
-   * e.g. for a Kafka system stream named "mystream", the formatted name should be "kafka:mystream".
-   */
-  private final String name;
-
-  //TODO: we may want to replace the map with Guava cache to allow GC
-  /**
-   * Static map of already allocated relation names
-   */
-  private static Map<String, EntityName> relations = new HashMap<String, EntityName>();
-
-  /**
-   * Static map of already allocated stream names
-   */
-  private static Map<String, EntityName> streams = new HashMap<String, EntityName>();
-
-  /**
-   * Private ctor to create entity names
-   *
-   * @param type Type of the entity name
-   * @param name Formatted name of the entity
-   */
-  private EntityName(EntityType type, String name) {
-    this.type = type;
-    this.name = name;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%s:%s", this.type, this.name);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof EntityName) {
-      EntityName otherEntity = (EntityName) other;
-      return this.type.equals(otherEntity.type) && this.name.equals(otherEntity.name);
-    }
-    return false;
-  }
-
-  /**
-   * Check to see whether this entity name is for a relation
-   *
-   * @return true if the entity type is <code>EntityType.RELATION</code>; false otherwise
-   */
-  public boolean isRelation() {
-    return this.type.equals(EntityType.RELATION);
-  }
-
-  /**
-   * Check to see whether this entity name is for a stream
-   *
-   * @return true if the entity type is <code>EntityType.STREAM</code>; false otherwise
-   */
-  public boolean isStream() {
-    return this.type.equals(EntityType.STREAM);
-  }
-
-  /**
-   * Get the formatted entity name
-   *
-   * @return The formatted entity name
-   */
-  public String getName() {
-    return this.name;
-  }
-
-  /**
-   * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.RELATION</code>
-   *
-   * @param name The formatted entity name of the relation
-   * @return A <code>EntityName</code> for a relation
-   */
-  public static EntityName getRelationName(String name) {
-    if (relations.get(name) == null) {
-      relations.put(name, new EntityName(EntityType.RELATION, name));
-    }
-    return relations.get(name);
-  }
-
-  /**
-   * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.STREAM</code>
-   *
-   * @param name The formatted entity name of the stream
-   * @return A <code>EntityName</code> for a stream
-   */
-  public static EntityName getStreamName(String name) {
-    if (streams.get(name) == null) {
-      streams.put(name, new EntityName(EntityType.STREAM, name));
-    }
-    return streams.get(name);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
deleted file mode 100644
index 90b8026..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import org.apache.samza.storage.kv.KeyValueStore;
-
-
-/**
- * This class defines the general interface of <code>Relation</code>, which is defined as a map of <code>Tuple</code>.
- *
- * <p>The interface is defined as an extension to <code>KeyValueStore&lt;Object, Tuple&gt;</code>.
- *
- */
-
-public interface Relation extends KeyValueStore<Object, Tuple> {
-
-  /**
-   * Get the primary key field name for this table
-   *
-   * @return The name of the primary key field
-   */
-  String getPrimaryKey();
-
-  /**
-   * Get the name of the relation created by CREATE TABLE
-   *
-   * @return The relation name
-   */
-  EntityName getName();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
deleted file mode 100644
index 0c21a53..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-/**
- * This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>.
- *
- * <p>The <code>Tuple</code> models the basic operatible unit in streaming SQL processes in Samza.
- *
- */
-public interface Tuple {
-
-  /**
-   * Access method to get the corresponding message body in the tuple
-   *
-   * @return Message object in the tuple
-   */
-  Object getMessage();
-
-  /**
-   * Method to indicate whether the tuple is a delete tuple or an insert tuple
-   *
-   * @return A boolean value indicates whether the current tuple is a delete or insert message
-   */
-  boolean isDelete();
-
-  /**
-   * Access method to the key of the tuple
-   *
-   * @return The <code>key</code> of the tuple
-   */
-  Object getKey();
-
-  /**
-   * Get the stream name of the tuple. Note this stream name should be unique in the system.
-   *
-   * @return The stream name which this tuple belongs to
-   */
-  EntityName getStreamName();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
deleted file mode 100644
index 0169f2d..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.WindowableTask;
-
-
-/**
- * This class defines the common interface for operator classes, no matter what input data are.
- *
- * <p> It extends the <code>InitableTask</code> and <code>WindowableTask</code> to reuse the interface methods
- * <code>init</code> and <code>window</code> for initialization and timeout operations
- *
- */
-public interface Operator extends InitableTask, WindowableTask {
-
-  /**
-   * Method to the specification of this <code>Operator</code>
-   *
-   * @return The <code>OperatorSpec</code> object that defines the configuration/parameters of the operator
-   */
-  OperatorSpec getSpec();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
deleted file mode 100644
index faa0a32..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface <code>RelationOperator</code>.
- *
- * <p>All operators implementing <code>RelationOperator</code> will take a <code>Relation</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All relation algebra operators, such as: join, select, where, group-by, having, limit, order-by, etc.
- * <li>All relation-to-stream operators, which converts a relation to a stream
- * </ul>
- *
- */
-public interface RelationOperator extends Operator {
-
-  /**
-   * Method to perform a relational algebra on a set of relations, or a relation-to-stream function
-   *
-   * <p> The actual implementation of relational logic is performed by the implementation of this method.
-   * The <code>collector</code> object is used by the operator to send their output to
-   *
-   * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
-   * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
-   * @throws Exception Throws exception if failed
-   */
-  void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
deleted file mode 100644
index 67671b9..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-
-
-/**
- * This class defines the interface of SQL operator factory, which creates the following operators:
- * <ul>
- * <li><code>RelationOperator</code> that takes <code>Relation</code> as input variables
- * <li><code>TupleOperator</code> that takes <code>Tuple</code> as input variables
- * </ul>
- *
- */
-public interface SqlOperatorFactory {
-
-  /**
-   * Interface method to create/get the <code>RelationOperator</code> object
-   *
-   * @param spec The specification of the <code>RelationOperator</code> object
-   * @return The relation operator object
-   */
-  RelationOperator getRelationOperator(OperatorSpec spec);
-
-  /**
-   * Interface method to create/get the <code>TupleOperator</code> object
-   *
-   * @param spec The specification of the <code>TupleOperator</code> object
-   * @return The tuple operator object
-   */
-  TupleOperator getTupleOperator(OperatorSpec spec);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
deleted file mode 100644
index ac4654e..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface class that processes incoming tuples from input stream(s).
- *
- * <p>All operators implementing <code>TupleOperator</code> will take a <code>Tuple</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All stream-to-relation operators, such as: window operators.
- * <li>All stream-to-stream operators, such as: re-partition, union of two streams
- * </ul>
- *
- */
-public interface TupleOperator extends Operator {
-  /**
-   * Interface method to process on an input tuple.
-   *
-   * @param tuple The input tuple, which has the incoming message from a stream
-   * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
-   * @throws Exception Throws exception if failed
-   */
-  void process(Tuple tuple, SqlMessageCollector collector) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
deleted file mode 100644
index 96385e2..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators.spec;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-/**
- * This class defines a generic specification interface class for all operators.
- *
- * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
- *
- * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
- * in <code>Tuple</code> or <code>Relation</code> name) of input variables , and the list of entity names of the output variables.
- *
- */
-public interface OperatorSpec {
-  /**
-   * Interface method that returns the unique ID of the operator in a task
-   *
-   * @return The unique ID of the <code>Operator</code> object
-   */
-  String getId();
-
-  /**
-   * Access method to the list of entity names of input variables.
-   *
-   * <p>The input entity names are either stream names if the operator is a <code>TupleOperator</code>;
-   * or <code>Relation</code> names if the operator is a <code>RelationOperator</code>
-   *
-   * @return A list of entity names of the inputs
-   */
-  List<EntityName> getInputNames();
-
-  /**
-   * Access method to the list of entity name of the output variable
-   *
-   * <p>The output entity name is either a stream name if the operator generates tuples as an output stream;
-   * or <code>Relation</code> names if the operator generates a <code>Relation</code> as output.
-   *
-   * @return The entity name of the output
-   *
-   */
-  List<EntityName> getOutputNames();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java b/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
deleted file mode 100644
index 2455a62..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.router;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-
-
-/**
- * This interface class defines interface methods to connect operators together.
- *
- * <p>The <code>OperatorRouter</code> allows the user to attach operators to a relation or a stream entity,
- * if the corresponding relation/stream is included as inputs to the operator. Each operator then executes its own logic
- * and determines which relation/stream to emit the output to. Through the <code>OperatorRouter</code>, the next
- * operators attached to the corresponding output entities (i.e. relations/streams) can then be invoked to continue the
- * stream process task.
- *
- * <p>The <code>OperatorRouter</code> also allows the user to set the system input entities (i.e. relations/streams)
- * that are fed into the operators by the system outside the <code>OperatorRouter</code>, not generated by some
- * operators in the <code>OperatorRouter</code>.
- *
- * <p>The methods included in this interface class allow a user to
- * <ul>
- * <li>i)   add operators to an <code>EntityName</code>
- * <li>ii)  get the next operators attached to an <code>EntityName</code>
- * <li>iii) add and get the system input <code>EntityName</code>s
- * <li>iv)  iterate through each and every operator connected via <code>OperatorRouter</code>
- * </ul>
- *
- */
-public interface OperatorRouter {
-
-  /**
-   * This method adds a <code>TupleOperator</code> as one of the input operators.
-   *
-   * @param stream The output stream entity name
-   * @param nextOp The <code>TupleOperator</code> that takes the tuples in the <code>stream</code> as an input.
-   * @throws Exception Throws exception if failed
-   */
-  void addTupleOperator(EntityName stream, TupleOperator nextOp) throws Exception;
-
-  /**
-   * This method adds a <code>RelationOperator</code> as one of the input operators
-
-   * @param relation The input relation entity name
-   * @param nextOp The <code>RelationOperator</code> that takes the <code>relation</code> as an input
-   * @throws Exception Throws exception if failed
-   */
-  void addRelationOperator(EntityName relation, RelationOperator nextOp) throws Exception;
-
-  /**
-   * This method gets the list of <code>RelationOperator</code>s attached to the <code>relation</code>
-   *
-   * @param relation The identifier of the relation entity
-   * @return The list of <code>RelationOperator</code> taking <code>relation</code> as an input variable
-   */
-  List<RelationOperator> getRelationOperators(EntityName relation);
-
-  /**
-   * This method gets the list of <code>TupleOperator</code>s attached to the <code>stream</code>
-   *
-   * @param stream The identifier of the stream entity
-   * @return The list of <code>TupleOperator</code> taking <code>stream</code> as an input variable
-   */
-  List<TupleOperator> getTupleOperators(EntityName stream);
-
-  /**
-   * This method gets the list of <code>Operator</code>s attached to an output entity (of any type)
-   *
-   * @param output The identifier of the output entity
-   * @return The list of <code>Operator</code> taking <code>output</code> as input variables
-   */
-  List<Operator> getNextOperators(EntityName output);
-
-  /**
-   * This method provides an iterator to go through all operators connected via <code>OperatorRouter</code>
-   *
-   * @return An <code>Iterator</code> for all operators connected via <code>OperatorRouter</code>
-   */
-  Iterator<Operator> iterator();
-
-  /**
-   * This method checks to see whether there is any <code>Operator</code> attached to the entity <code>output</code>
-   *
-   * @param output The output entity name
-   * @return True if there is some operator attached to the <code>output</code>; false otherwise
-   */
-  boolean hasNextOperators(EntityName output);
-
-  /**
-   * This method adds an entity as the system input
-   *
-   * @param input The entity name for the system input
-   */
-  void addSystemInput(EntityName input);
-
-  /**
-   * This method returns the list of entities as system inputs
-   *
-   * @return The list of <code>EntityName</code>s as system inputs
-   */
-  List<EntityName> getSystemInputs();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
deleted file mode 100644
index a8a55e2..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.data;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.system.IncomingMessageEnvelope;
-
-
-/**
- * This class implements a <code>Tuple</code> class that encapsulates <code>IncomingMessageEnvelope</code> from the system
- *
- */
-public class IncomingMessageTuple implements Tuple {
-  /**
-   * Incoming message envelope
-   */
-  private final IncomingMessageEnvelope imsg;
-
-  /**
-   * The entity name for the incoming system stream
-   */
-  private final EntityName strmEntity;
-
-  /**
-   * Ctor to create a <code>IncomingMessageTuple</code> from <code>IncomingMessageEnvelope</code>
-   *
-   * @param imsg The incoming system message
-   */
-  public IncomingMessageTuple(IncomingMessageEnvelope imsg) {
-    this.imsg = imsg;
-    this.strmEntity =
-        EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg
-            .getSystemStreamPartition().getStream()));
-  }
-
-  // TODO: the return type should be changed to the generic data type
-  @Override
-  public Object getMessage() {
-    return this.imsg.getMessage();
-  }
-
-  @Override
-  public boolean isDelete() {
-    return false;
-  }
-
-  @Override
-  public Object getKey() {
-    return imsg.getKey();
-  }
-
-  @Override
-  public EntityName getStreamName() {
-    return this.strmEntity;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
deleted file mode 100644
index c634159..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
- *
- */
-public abstract class SimpleOperator implements Operator {
-  /**
-   * The specification of this operator
-   */
-  private final OperatorSpec spec;
-
-  /**
-   * Ctor of <code>SimpleOperator</code> class
-   *
-   * @param spec The specification of this operator
-   */
-  public SimpleOperator(OperatorSpec spec) {
-    this.spec = spec;
-  }
-
-  @Override
-  public OperatorSpec getSpec() {
-    return this.spec;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
deleted file mode 100644
index 916b166..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.SqlOperatorFactory;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-
-
-/**
- * This simple factory class provides method to create the build-in operators per operator specification.
- * It can be extended when the build-in operators expand.
- *
- */
-public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
-
-  @Override
-  public RelationOperator getRelationOperator(OperatorSpec spec) {
-    if (spec instanceof JoinSpec) {
-      return new Join((JoinSpec) spec);
-    } else if (spec instanceof InsertStreamSpec) {
-      return new InsertStream((InsertStreamSpec) spec);
-    }
-    throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
-  }
-
-  @Override
-  public TupleOperator getTupleOperator(OperatorSpec spec) {
-    if (spec instanceof WindowSpec) {
-      return new BoundedTimeWindow((WindowSpec) spec);
-    } else if (spec instanceof PartitionSpec) {
-      return new PartitionOp((PartitionSpec) spec);
-    }
-    throw new UnsupportedOperationException("Unsupported operator specified" + spec.getClass().getCanonicalName());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
deleted file mode 100644
index 93d4ebb..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.sql.operators.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
- *
- */
-public abstract class SimpleOperatorSpec implements OperatorSpec {
-  /**
-   * The identifier of the corresponding operator
-   */
-  private final String id;
-
-  /**
-   * The list of input entity names of the corresponding operator
-   */
-  private final List<EntityName> inputs = new ArrayList<EntityName>();
-
-  /**
-   * The list of output entity names of the corresponding operator
-   */
-  private final List<EntityName> outputs = new ArrayList<EntityName>();
-
-  /**
-   * Ctor of the <code>SimpleOperatorSpec</code> for simple <code>Operator</code>s w/ one input and one output
-   *
-   * @param id Unique identifier of the <code>Operator</code> object
-   * @param input The only input entity
-   * @param output The only output entity
-   */
-  public SimpleOperatorSpec(String id, EntityName input, EntityName output) {
-    this.id = id;
-    this.inputs.add(input);
-    this.outputs.add(output);
-  }
-
-  /**
-   * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and n outputs
-   *
-   * @param id Unique identifier of the <code>Operator</code> object
-   * @param inputs The list of input entities
-   * @param output The list of output entities
-   */
-  public SimpleOperatorSpec(String id, List<EntityName> inputs, EntityName output) {
-    this.id = id;
-    this.inputs.addAll(inputs);
-    this.outputs.add(output);
-  }
-
-  @Override
-  public String getId() {
-    return this.id;
-  }
-
-  @Override
-  public List<EntityName> getInputNames() {
-    return this.inputs;
-  }
-
-  @Override
-  public List<EntityName> getOutputNames() {
-    return this.outputs;
-  }
-
-  /**
-   * Method to get the first output entity
-   *
-   * @return The first output entity name
-   */
-  public EntityName getOutputName() {
-    return this.outputs.get(0);
-  }
-
-  /**
-   * Method to get the first input entity
-   *
-   * @return The first input entity name
-   */
-  public EntityName getInputName() {
-    return this.inputs.get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
deleted file mode 100644
index 7921d4f..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.partition;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This is an example build-in operator that performs a simple stream re-partition operation.
- *
- */
-public final class PartitionOp extends SimpleOperator implements TupleOperator {
-
-  /**
-   * The specification of this <code>PartitionOp</code>
-   *
-   */
-  private final PartitionSpec spec;
-
-  /**
-   * Ctor that takes the <code>PartitionSpec</code> object as input.
-   *
-   * @param spec The <code>PartitionSpec</code> object
-   */
-  public PartitionOp(PartitionSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * A simplified constructor that allow users to randomly create <code>PartitionOp</code>
-   *
-   * @param id The identifier of this operator
-   * @param input The input stream name of this operator
-   * @param system The output system name of this operator
-   * @param output The output stream name of this operator
-   * @param parKey The partition key used for the output stream
-   * @param parNum The number of partitions used for the output stream
-   */
-  public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) {
-    super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
-    this.spec = (PartitionSpec) super.getSpec();
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // TODO Auto-generated method stub
-    // No need to initialize store since all inputs are immediately send out
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // TODO Auto-generated method stub
-    // NOOP or flush
-  }
-
-  @Override
-  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
-        null /* TODO: when merge with Schema API changes, use: tuple
-             .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
deleted file mode 100644
index 29d1784..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.partition;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * This class defines the specification class of <code>PartitionOp</code> operator
- *
- */
-public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * The partition key name
-   */
-  private final String parKey;
-
-  /**
-   * The number of partitions
-   */
-  private final int parNum;
-
-  /**
-   * The <code>SystemStream</code> to send the partition output to
-   */
-  private final SystemStream sysStream;
-
-  /**
-   * Ctor to create the <code>PartitionSpec</code>
-   *
-   * @param id The ID of the <code>PartitionOp</code>
-   * @param input The input stream name
-   * @param output The output <code>SystemStream</code> object
-   * @param parKey The name of the partition key
-   * @param parNum The number of partitions
-   */
-  public PartitionSpec(String id, String input, SystemStream output, String parKey, int parNum) {
-    super(id, EntityName.getStreamName(input), EntityName.getStreamName(output.getSystem() + ":" + output.getStream()));
-    this.parKey = parKey;
-    this.parNum = parNum;
-    this.sysStream = output;
-  }
-
-  /**
-   * Method to get the partition key name
-   *
-   * @return The partition key name
-   */
-  public String getParKey() {
-    return this.parKey;
-  }
-
-  /**
-   * Method to get the number of partitions
-   *
-   * @return The number of partitions
-   */
-  public int getParNum() {
-    return this.parNum;
-  }
-
-  /**
-   * Method to get the output <code>SystemStream</code>
-   *
-   * @return The output system stream object
-   */
-  public SystemStream getSystemStream() {
-    return this.sysStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
deleted file mode 100644
index a8a6eaf..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.relation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for a join operator between two relations.
- *
- */
-public class Join extends SimpleOperator implements RelationOperator {
-
-  private final JoinSpec spec;
-
-  /**
-   * The input relations
-   *
-   */
-  private List<Relation> inputs = null;
-
-  /**
-   * The output relation
-   */
-  private Relation output = null;
-
-  /**
-   * Ctor that creates <code>Join</code> operator based on the specification.
-   *
-   * @param spec The <code>JoinSpec</code> object that specifies the join operator
-   */
-  public Join(JoinSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * An alternative ctor that allows users to create a join operator randomly.
-   *
-   * @param id The identifier of the join operator
-   * @param joinIns The list of input relation names of the join
-   * @param joinOut The output relation name of the join
-   * @param joinKeys The list of keys used in the join. Each entry in the <code>joinKeys</code> is the key name used in one of the input relations.
-   *     The order of the <code>joinKeys</code> MUST be the same as their corresponding relation names in <code>joinIns</code>
-   */
-  @SuppressWarnings("serial")
-  public Join(final String id, final List<String> joinIns, final String joinOut, final List<String> joinKeys) {
-    super(new JoinSpec(id, new ArrayList<EntityName>() {
-      {
-        for (String name : joinIns) {
-          add(EntityName.getRelationName(name));
-        }
-      }
-    }, EntityName.getRelationName(joinOut), joinKeys));
-    this.spec = (JoinSpec) this.getSpec();
-  }
-
-  private boolean hasPendingChanges() {
-    return getPendingChanges() != null;
-  }
-
-  private Relation getPendingChanges() {
-    // TODO Auto-generated method stub
-    // return any pending changes that have not been processed yet
-    return null;
-  }
-
-  private Relation getOutputChanges() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private boolean hasOutputChanges() {
-    // TODO Auto-generated method stub
-    return getOutputChanges() != null;
-  }
-
-  private void join(Relation deltaRelation) {
-    // TODO Auto-generated method stub
-    // implement the join logic
-    // 1. calculate the delta changes in <code>output</code>
-    // 2. check output condition to see whether the current input should trigger an output
-    // 3. set the output changes and pending changes
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    for (EntityName relation : this.spec.getInputNames()) {
-      inputs.add((Relation) context.getStore(relation.toString()));
-    }
-    this.output = (Relation) context.getStore(this.spec.getOutputName().toString());
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
-    if (hasPendingChanges()) {
-      sqlCollector.send(getPendingChanges());
-    }
-    sqlCollector.timeout(this.spec.getOutputNames());
-  }
-
-  @Override
-  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
-    // calculate join based on the input <code>deltaRelation</code>
-    join(deltaRelation);
-    if (hasOutputChanges()) {
-      collector.send(getOutputChanges());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
deleted file mode 100644
index ba8bfb5..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.relation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * This class implements specification class for the build-in <code>Join</code> operator
- */
-public class JoinSpec extends SimpleOperatorSpec implements OperatorSpec {
-  /**
-   * Join keys defined for each input relation
-   */
-  private final List<String> joinKeys = new ArrayList<String>();
-
-  /**
-   * Default ctor for the <code>JoinSpec</code>
-   *
-   * @param id Unique ID of the <code>Join</code> object
-   * @param joinIns The list of input relations
-   * @param joinOut The output relation
-   * @param joinKeys The list of join keys in input relations
-   */
-  public JoinSpec(String id, List<EntityName> joinIns, EntityName joinOut, List<String> joinKeys) {
-    super(id, joinIns, joinOut);
-    this.joinKeys.addAll(joinKeys);
-  }
-
-  /**
-   * Method to get the list of join keys
-   *
-   * @return The list of join keys
-   */
-  public List<String> getJoinKeys() {
-    return this.joinKeys;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
deleted file mode 100644
index 7563100..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.stream;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for an istream operator that converts a relation to a stream
- *
- */
-public class InsertStream extends SimpleOperator implements RelationOperator {
-  /**
-   * The <code>InsertStreamSpec</code> for this operator
-   */
-  private final InsertStreamSpec spec;
-
-  /**
-   * The time-varying relation that is to be converted into a stream
-   */
-  private Relation relation = null;
-
-  /**
-   * Ctor that takes the specication of the object as input parameter
-   *
-   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
-   *
-   * @param spec The <code>InsertStreamSpec</code> specification of this operator
-   */
-  public InsertStream(InsertStreamSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * An alternative ctor that allow users to create an <code>InsertStream</code> object randomly
-   *
-   * @param id The identifier of the <code>InsertStream</code> object
-   * @param input The input relation
-   * @param output The output stream
-   */
-  public InsertStream(String id, String input, String output) {
-    super(new InsertStreamSpec(id, EntityName.getRelationName(input), EntityName.getStreamName(output)));
-    this.spec = (InsertStreamSpec) super.getSpec();
-  }
-
-  @Override
-  public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
-    KeyValueIterator<Object, Tuple> iterator = deltaRelation.all();
-    for (; iterator.hasNext();) {
-      Tuple tuple = iterator.next().getValue();
-      if (!tuple.isDelete()) {
-        collector.send(tuple);
-      }
-    }
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    if (this.relation == null) {
-      this.relation = (Relation) context.getStore(this.spec.getInputName().toString());
-    }
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // TODO Auto-generated method stub
-    // assuming this operation does not have pending changes kept in memory
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
deleted file mode 100644
index 70475ce..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.stream;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * Example implementation of specification of <code>InsertStream</code> operator
- */
-public class InsertStreamSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * Default ctor of <code>InsertStreamSpec</code>
-   *
-   * @param id The identifier of the operator
-   * @param input The input relation entity
-   * @param output The output stream entity
-   */
-  public InsertStreamSpec(String id, EntityName input, EntityName output) {
-    super(id, input, output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
deleted file mode 100644
index 935ffc0..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for a fixed size window operator that converts a stream to a relation
- *
- */
-public class BoundedTimeWindow extends SimpleOperator implements TupleOperator {
-
-  /**
-   * The specification of this window operator
-   */
-  private final WindowSpec spec;
-
-  /**
-   * The relation that the window operator keeps internally
-   */
-  private Relation relation = null;
-
-  /**
-   * The list of window states of all active windows the window operator keeps in track
-   */
-  private List<WindowState> windowStates = null;
-
-  /**
-   * Ctor that takes <code>WindowSpec</code> specification as input argument
-   *
-   * <p>This version of constructor is often used in an implementation of <code>SqlOperatorFactory</code>
-   *
-   * @param spec The window specification object
-   */
-  public BoundedTimeWindow(WindowSpec spec) {
-    super(spec);
-    this.spec = spec;
-  }
-
-  /**
-   * A simplified version of ctor that allows users to randomly created a window operator w/o spec object
-   *
-   * @param wndId The identifier of this window operator
-   * @param lengthSec The window size in seconds
-   * @param input The input stream name
-   * @param output The output relation name
-   */
-  public BoundedTimeWindow(String wndId, int lengthSec, String input, String output) {
-    super(new WindowSpec(wndId, EntityName.getStreamName(input), EntityName.getRelationName(output), lengthSec));
-    this.spec = (WindowSpec) super.getSpec();
-  }
-
-  @Override
-  public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
-    // for each tuple, this will evaluate the incoming tuple and update the window states.
-    // If the window states allow generating output, calculate the delta changes in
-    // the window relation and execute the relation operation <code>nextOp</code>
-    updateWindow(tuple);
-    processWindowChanges(collector);
-  }
-
-  private void processWindowChanges(SqlMessageCollector collector) throws Exception {
-    if (windowStateChange()) {
-      collector.send(getWindowChanges());
-    }
-  }
-
-  private Relation getWindowChanges() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private boolean windowStateChange() {
-    // TODO Auto-generated method stub
-    return getWindowChanges() != null;
-  }
-
-  private void updateWindow(Tuple tuple) {
-    // TODO Auto-generated method stub
-    // The window states are updated here
-    // And the correpsonding deltaChanges is also calculated here.
-  }
-
-  private void updateWindowTimeout() {
-    // TODO Auto-generated method stub
-    // The window states are updated here
-    // And the correpsonding deltaChanges is also calculated here.
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
-    updateWindowTimeout();
-    processWindowChanges(sqlCollector);
-    sqlCollector.timeout(this.spec.getOutputNames());
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // TODO Auto-generated method stub
-    if (this.relation == null) {
-      this.relation = (Relation) context.getStore(this.spec.getOutputName().toString());
-      Relation wndStates = (Relation) context.getStore(this.spec.getWndStatesName());
-      this.windowStates = new ArrayList<WindowState>();
-      for (KeyValueIterator<Object, Tuple> iter = wndStates.all(); iter.hasNext();) {
-        this.windowStates.add((WindowState) iter.next().getValue().getMessage());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
deleted file mode 100644
index e2ae3aa..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
-
-
-/**
- * This class implements the specification class for the build-in <code>BoundedTimeWindow</code> operator
- */
-public class WindowSpec extends SimpleOperatorSpec implements OperatorSpec {
-
-  /**
-   * The window size in seconds
-   */
-  private final int wndSizeSec;
-
-  /**
-   * Default ctor of the <code>WindowSpec</code> object
-   *
-   * @param id The identifier of the operator
-   * @param input The input stream entity
-   * @param output The output relation entity
-   * @param lengthSec The window size in seconds
-   */
-  public WindowSpec(String id, EntityName input, EntityName output, int lengthSec) {
-    super(id, input, output);
-    this.wndSizeSec = lengthSec;
-  }
-
-  /**
-   * Method to get the window state relation name
-   *
-   * @return The window state relation name
-   */
-  public String getWndStatesName() {
-    return this.getId() + "-wnd-state";
-  }
-
-  /**
-   * Method to get the window size in seconds
-   *
-   * @return The window size in seconds
-   */
-  public int getWndSizeSec() {
-    return this.wndSizeSec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
deleted file mode 100644
index 48547f0..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.window;
-
-public class WindowState {
-  public String startOffset = null;
-  public String endOffset = null;
-  public boolean isClosed = false;
-
-  public void open(String offset) {
-    this.isClosed = false;
-    this.startOffset = offset;
-  }
-
-  public void close(String offset) {
-    this.endOffset = offset;
-    this.isClosed = true;
-  }
-
-  public void advanceTo(String offset) {
-    this.endOffset = offset;
-  }
-
-  public boolean isClosed() {
-    return this.isClosed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java b/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
deleted file mode 100644
index c6fc673..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/router/SimpleRouter.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.router;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-
-
-/**
- * Example implementation of <code>OperatorRouter</code>
- *
- */
-public class SimpleRouter implements OperatorRouter {
-  /**
-   * List of operators added to the <code>OperatorRouter</code>
-   */
-  private List<Operator> operators = new ArrayList<Operator>();
-
-  @SuppressWarnings("rawtypes")
-  /**
-   * Map of <code>EntityName</code> to the list of operators associated with it
-   */
-  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
-
-  /**
-   * List of <code>EntityName</code> as system inputs
-   */
-  private List<EntityName> inputEntities = new ArrayList<EntityName>();
-
-  @SuppressWarnings("unchecked")
-  private void addOperator(EntityName output, Operator nextOp) {
-    if (nextOps.get(output) == null) {
-      nextOps.put(output, new ArrayList<Operator>());
-    }
-    nextOps.get(output).add(nextOp);
-    operators.add(nextOp);
-
-  }
-
-  @Override
-  public Iterator<Operator> iterator() {
-    return operators.iterator();
-  }
-
-  @Override
-  public void addTupleOperator(EntityName outputStream, TupleOperator nextOp) throws Exception {
-    if (!outputStream.isStream()) {
-      throw new IllegalArgumentException("Can't attach an TupleOperator " + nextOp.getSpec().getId()
-          + " to a non-stream entity " + outputStream);
-    }
-    addOperator(outputStream, nextOp);
-  }
-
-  @Override
-  public void addRelationOperator(EntityName outputRelation, RelationOperator nextOp) throws Exception {
-    if (!outputRelation.isRelation()) {
-      throw new IllegalArgumentException("Can't attach an RelationOperator " + nextOp.getSpec().getId()
-          + " to a non-relation entity " + outputRelation);
-    }
-    addOperator(outputRelation, nextOp);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<RelationOperator> getRelationOperators(EntityName outputRelation) {
-    if (!outputRelation.isRelation()) {
-      throw new IllegalArgumentException("Can't get RelationOperators for a non-relation output: " + outputRelation);
-    }
-    return nextOps.get(outputRelation);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<TupleOperator> getTupleOperators(EntityName outputStream) {
-    if (!outputStream.isStream()) {
-      throw new IllegalArgumentException("Can't get TupleOperators for a non-stream output: " + outputStream);
-    }
-    return nextOps.get(outputStream);
-  }
-
-  @Override
-  public boolean hasNextOperators(EntityName output) {
-    return nextOps.get(output) != null && !nextOps.get(output).isEmpty();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<Operator> getNextOperators(EntityName output) {
-    return nextOps.get(output);
-  }
-
-  @Override
-  public void addSystemInput(EntityName input) {
-    if (!nextOps.containsKey(input) || nextOps.get(input).isEmpty()) {
-      throw new IllegalStateException("Can't set a system input w/o any next operators. input:" + input);
-    }
-    if (!inputEntities.contains(input)) {
-      inputEntities.add(input);
-    }
-  }
-
-  @Override
-  public List<EntityName> getSystemInputs() {
-    return this.inputEntities;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
deleted file mode 100644
index 1e5310f..0000000
--- a/samza-sql/src/main/java/org/apache/samza/task/sql/OperatorMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Example implementation of a <code>SqlMessageCollector</code> that uses <code>OperatorRouter</code>
- *
- */
-public class OperatorMessageCollector implements SqlMessageCollector {
-
-  private final MessageCollector collector;
-  private final TaskCoordinator coordinator;
-  private final OperatorRouter rteCntx;
-
-  public OperatorMessageCollector(MessageCollector collector, TaskCoordinator coordinator, OperatorRouter rteCntx) {
-    this.collector = collector;
-    this.coordinator = coordinator;
-    this.rteCntx = rteCntx;
-  }
-
-  @Override
-  public void send(Relation deltaRelation) throws Exception {
-    for (RelationOperator op : this.rteCntx.getRelationOperators(deltaRelation.getName())) {
-      op.process(deltaRelation, this);
-    }
-  }
-
-  @Override
-  public void send(Tuple tuple) throws Exception {
-    for (TupleOperator op : this.rteCntx.getTupleOperators(tuple.getStreamName())) {
-      op.process(tuple, this);
-    }
-  }
-
-  @Override
-  public void timeout(List<EntityName> outputs) throws Exception {
-    for (EntityName output : outputs) {
-      for (Operator op : this.rteCntx.getNextOperators(output)) {
-        op.window(this, this.coordinator);
-      }
-    }
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    this.collector.send(envelope);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
deleted file mode 100644
index b98e2d7..0000000
--- a/samza-sql/src/main/java/org/apache/samza/task/sql/SqlMessageCollector.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.MessageCollector;
-
-
-/**
- * This class defines the interface class to be used by the operators to send their output via runtime system resources,
- * s.t. the output system streams, the system storage, or <code>OperatorRouter</code>.
- *
- */
-public interface SqlMessageCollector extends MessageCollector {
-
-  /**
-   * This method allows the current operator send its relation output to next
-   *
-   * @param deltaRelation The delta <code>Relation</code> output generated by the current operator
-   * @throws Exception Throws exception if failed
-   */
-  void send(Relation deltaRelation) throws Exception;
-
-  /**
-   * This method allows the current operator send its tuple output to next
-   *
-   * @param tuple The <code>Tuple</code> object generated by the current operator
-   * @throws Exception Throws exception if failed
-   */
-  void send(Tuple tuple) throws Exception;
-
-  /**
-   * This method allows the current operator triggers timeout actions via the <code>SqlMessageCollector</code>.
-   *
-   * <p>This method sets timeout events to the corresponding <code>outputEntities</code> s.t. the next operators
-   * attached to those entities will be notified of the timeout.
-   *
-   * @param outputEntities The list of output entities via which the timeout event needs to be sent to
-   * @throws Exception Throws exception if failed
-   */
-  void timeout(List<EntityName> outputEntities) throws Exception;
-
-}