You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/05/16 04:02:57 UTC
[2/2] samza git commit: SAMZA-552 update operator APIs
SAMZA-552 update operator APIs
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c4cd01
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c4cd01
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c4cd01
Branch: refs/heads/samza-sql
Commit: 41c4cd0124b21a49bf92cc73ac2a5acd1b21712f
Parents: b47e47f
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Fri May 15 19:02:34 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Fri May 15 19:02:34 2015 -0700
----------------------------------------------------------------------
.../apache/samza/sql/api/data/EntityName.java | 33 ++--
.../org/apache/samza/sql/api/data/Relation.java | 15 +-
.../org/apache/samza/sql/api/data/Stream.java | 40 +++++
.../org/apache/samza/sql/api/data/Table.java | 38 +++++
.../org/apache/samza/sql/api/data/Tuple.java | 19 ++-
.../samza/sql/api/operators/Operator.java | 58 +++++--
.../sql/api/operators/OperatorCallback.java | 70 +++++++++
.../samza/sql/api/operators/OperatorRouter.java | 54 +++++++
.../samza/sql/api/operators/OperatorSpec.java | 58 +++++++
.../sql/api/operators/RelationOperator.java | 51 -------
.../samza/sql/api/operators/SimpleOperator.java | 34 +++++
.../sql/api/operators/SqlOperatorFactory.java | 24 +--
.../samza/sql/api/operators/TupleOperator.java | 47 ------
.../sql/api/operators/spec/OperatorSpec.java | 64 --------
.../samza/sql/api/router/OperatorRouter.java | 126 ----------------
.../samza/sql/data/IncomingMessageTuple.java | 27 +++-
.../operators/factory/NoopOperatorCallback.java | 50 ++++++
.../sql/operators/factory/SimpleOperator.java | 50 ------
.../factory/SimpleOperatorFactoryImpl.java | 34 ++---
.../operators/factory/SimpleOperatorImpl.java | 136 +++++++++++++++++
.../operators/factory/SimpleOperatorSpec.java | 12 +-
.../sql/operators/factory/SimpleRouter.java | 136 +++++++++++++++++
.../sql/operators/join/StreamStreamJoin.java | 117 ++++++++++++++
.../operators/join/StreamStreamJoinSpec.java | 38 +++++
.../sql/operators/partition/PartitionOp.java | 55 +++++--
.../sql/operators/partition/PartitionSpec.java | 12 +-
.../samza/sql/operators/relation/Join.java | 139 -----------------
.../samza/sql/operators/relation/JoinSpec.java | 60 --------
.../sql/operators/stream/InsertStream.java | 98 ------------
.../sql/operators/stream/InsertStreamSpec.java | 42 ------
.../sql/operators/window/BoundedTimeWindow.java | 68 ++++++---
.../samza/sql/operators/window/WindowSpec.java | 6 +-
.../apache/samza/sql/router/SimpleRouter.java | 133 ----------------
.../sql/window/storage/OrderedStoreKey.java | 26 ++++
.../org/apache/samza/system/sql/LongOffset.java | 66 ++++++++
.../org/apache/samza/system/sql/Offset.java | 27 ++++
.../task/sql/OperatorMessageCollector.java | 80 ----------
.../samza/task/sql/RouterMessageCollector.java | 56 +++++++
.../samza/task/sql/SimpleMessageCollector.java | 114 ++++++++++++++
.../samza/task/sql/SqlMessageCollector.java | 64 --------
.../samza/task/sql/StoreMessageCollector.java | 80 ----------
.../samza/task/sql/RandomOperatorTask.java | 151 -------------------
.../task/sql/RandomWindowOperatorTask.java | 96 ++++++++++++
.../apache/samza/task/sql/StreamSqlTask.java | 97 +++---------
.../samza/task/sql/UserCallbacksSqlTask.java | 150 ++++++++++++++++++
45 files changed, 1550 insertions(+), 1401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
index 127a677..80ba455 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
@@ -24,15 +24,15 @@ import java.util.Map;
/**
- * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. relations and streams.
+ * This class defines the name scheme for the collective data entities in Samza Stream SQL, i.e. tables and streams.
*/
public class EntityName {
/**
- * <code>EntityType</code> defines the types of the entity names
+ * {@code EntityType} defines the types of the entity names
*
*/
private enum EntityType {
- RELATION,
+ TABLE,
STREAM
};
@@ -44,16 +44,15 @@ public class EntityName {
/**
* Formatted name of the entity.
*
- * <p>This formatted name of the entity should be unique identifier for the corresponding relation/stream in the system.
+ * <p>This formatted name of the entity should be unique identifier for the corresponding table/stream in the system.
* e.g. for a Kafka system stream named "mystream", the formatted name should be "kafka:mystream".
*/
private final String name;
- //TODO: we may want to replace the map with Guava cache to allow GC
/**
- * Static map of already allocated relation names
+ * Static map of already allocated table names
*/
- private static Map<String, EntityName> relations = new HashMap<String, EntityName>();
+ private static Map<String, EntityName> tables = new HashMap<String, EntityName>();
/**
* Static map of already allocated stream names
@@ -86,18 +85,18 @@ public class EntityName {
}
/**
- * Check to see whether this entity name is for a relation
+ * Check to see whether this entity name is for a table
*
- * @return true if the entity type is <code>EntityType.RELATION</code>; false otherwise
+ * @return true if the entity type is {@code EntityType.TABLE}; false otherwise
*/
- public boolean isRelation() {
- return this.type.equals(EntityType.RELATION);
+ public boolean isTable() {
+ return this.type.equals(EntityType.TABLE);
}
/**
* Check to see whether this entity name is for a stream
*
- * @return true if the entity type is <code>EntityType.STREAM</code>; false otherwise
+ * @return true if the entity type is {@code EntityType.STREAM}; false otherwise
*/
public boolean isStream() {
return this.type.equals(EntityType.STREAM);
@@ -113,16 +112,16 @@ public class EntityName {
}
/**
- * Static method to get the instance of <code>EntityName</code> with type <code>EntityType.RELATION</code>
+ * Static method to get the instance of {@code EntityName} with type {@code EntityType.TABLE}
*
* @param name The formatted entity name of the relation
* @return A <code>EntityName</code> for a relation
*/
- public static EntityName getRelationName(String name) {
- if (relations.get(name) == null) {
- relations.put(name, new EntityName(EntityType.RELATION, name));
+ public static EntityName getTableName(String name) {
+ if (tables.get(name) == null) {
+ tables.put(name, new EntityName(EntityType.TABLE, name));
}
- return relations.get(name);
+ return tables.get(name);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
index 90b8026..72816a3 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java
@@ -23,23 +23,16 @@ import org.apache.samza.storage.kv.KeyValueStore;
/**
- * This class defines the general interface of <code>Relation</code>, which is defined as a map of <code>Tuple</code>.
+ * This class defines the general interface of {@code Relation}, which is defined as a map of {@link org.apache.samza.sql.api.data.Tuple}.
*
- * <p>The interface is defined as an extension to <code>KeyValueStore<Object, Tuple></code>.
+ * <p>The interface is defined as an extension to {@link org.apache.samza.storage.kv.KeyValueStore}.
*
*/
-public interface Relation extends KeyValueStore<Object, Tuple> {
+public interface Relation<K> extends KeyValueStore<K, Tuple> {
/**
- * Get the primary key field name for this table
- *
- * @return The name of the primary key field
- */
- String getPrimaryKey();
-
- /**
- * Get the name of the relation created by CREATE TABLE
+ * Get the name of the relation
*
* @return The relation name
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
new file mode 100644
index 0000000..931705e
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.List;
+
+
+/**
+ * This interface defines an ordered {@link org.apache.samza.sql.api.data.Relation}, which has an ordered key.
+ *
+ * <p> This is to define a stream created by CREATE STREAM statement
+ *
+ * @param <K> The ordered key for the {@code Stream} class
+ */
+public interface Stream<K extends Comparable<?>> extends Relation<K> {
+ /**
+ * Get the list of field names used as the order keys for this stream
+ *
+ * @return The list of field names used to construct the order key for the stream
+ */
+ List<String> getOrderFields();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
new file mode 100644
index 0000000..7b4d984
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+/**
+ * This interface defines a non-ordered {@link org.apache.samza.sql.api.data.Relation}, which has a unique primary key
+ *
+ * <p> This is to define a table created by CREATE TABLE statement
+ *
+ * @param <K> The primary key for the {@code Table} class
+ */
+public interface Table<K> extends Relation<K> {
+
+ /**
+ * Get the primary key field name for this table
+ *
+ * @return The name of the primary key field
+ */
+ String getPrimaryKeyName();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index bc8efcf..bea922b 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -19,6 +19,9 @@
package org.apache.samza.sql.api.data;
+import org.apache.samza.system.sql.Offset;
+
+
/**
* This class defines the generic interface of <code>Tuple</code>, which is a entry from the incoming stream, or one row in a <code>Relation</code>.
*
@@ -53,6 +56,20 @@ public interface Tuple {
*
* @return The stream name which this tuple belongs to
*/
- EntityName getStreamName();
+ EntityName getEntityName();
+
+ /**
+ * Get the message creation timestamp of the tuple.
+ *
+ * @return The tuple's creation timestamp in nano seconds.
+ */
+ long getCreateTimeNano();
+
+ /**
+ * Get the offset of the tuple in the stream. This should be used to uniquely identify a tuple in a stream.
+ *
+ * @return The offset of the tuple in the stream.
+ */
+ Offset getOffset();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
index 0169f2d..d6f6b57 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
@@ -19,25 +19,55 @@
package org.apache.samza.sql.api.operators;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.WindowableTask;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
-/**
- * This class defines the common interface for operator classes, no matter what input data are.
- *
- * <p> It extends the <code>InitableTask</code> and <code>WindowableTask</code> to reuse the interface methods
- * <code>init</code> and <code>window</code> for initialization and timeout operations
- *
- */
-public interface Operator extends InitableTask, WindowableTask {
+public interface Operator {
+ /**
+ * Method to initialize the operator
+ *
+ * @param config The configuration object
+ * @param context The task context
+ * @throws Exception Throws Exception if failed to initialize the operator
+ */
+ void init(Config config, TaskContext context) throws Exception;
+
+ /**
+ * Method to perform a relational logic on the input relation
+ *
+ * <p> The actual implementation of relational logic is performed by the implementation of this method.
+ *
+ * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
+ * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+ * @throws Exception Throws exception if failed
+ */
+ void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
+ throws Exception;
+
+ /**
+ * Method to process on an input tuple.
+ *
+ * @param tuple The input tuple, which has the incoming message from a stream
+ * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+ * @throws Exception Throws exception if failed
+ */
+ void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
/**
- * Method to the specification of this <code>Operator</code>
+ * Method to refresh the result when a timer expires
*
- * @return The <code>OperatorSpec</code> object that defines the configuration/parameters of the operator
+ * @param timeNano The current system time in nano second
+ * @param collector The {@link org.apache.samza.task.MessageCollector} that accepts outputs from the operator
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+ * @throws Exception Throws exception if failed
*/
- OperatorSpec getSpec();
+ void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
new file mode 100644
index 0000000..fb2aa89
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.api.operators;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Defines the callback functions to allow customized functions to be invoked before process and before sending the result
+ */
+public interface OperatorCallback {
+ /**
+ * Method to be invoked before the operator actually process the input tuple
+ *
+ * @param tuple The incoming tuple
+ * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+ * @return The tuple to be processed; return {@code null} if there is nothing to be processed
+ */
+ Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
+
+ /**
+ * Method to be invoked before the operator actually process the input relation
+ *
+ * @param rel The input relation
+ * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+ * @return The relation to be processed; return {@code null} if there is nothing to be processed
+ */
+ Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
+
+ /**
+ * Method to be invoked before the operator's output tuple is sent
+ *
+ * @param tuple The output tuple
+ * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+ * @return The tuple to be sent; return {@code null} if there is nothing to be sent
+ */
+ Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator);
+
+ /**
+ * Method to be invoked before the operator's output relation is sent
+ *
+ * @param rel The output relation
+ * @param collector The {@link org.apache.samza.task.MessageCollector} in context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in context
+ * @return The relation to be sent; return {@code null} if there is nothing to be sent
+ */
+ Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
new file mode 100644
index 0000000..0759638
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+
+
+/**
+ * This interface class defines interface methods to connect {@link org.apache.samza.sql.api.operators.SimpleOperator}s together into a composite operator.
+ *
+ * <p>The {@code OperatorRouter} allows the user to attach operators to a {@link org.apache.samza.sql.api.data.Table} or
+ * a {@link org.apache.samza.sql.api.data.Stream} entity, if the corresponding table/stream is included as inputs to the operator.
+ * Each operator then executes its own logic and determines which table/stream to emit the output to. Through the {@code OperatorRouter},
+ * the next operators attached to the corresponding output entities (i.e. table/streams) can then be invoked to continue the
+ * stream process task.
+ */
+public interface OperatorRouter extends Operator {
+
+ /**
+ * This method adds a {@link org.apache.samza.sql.api.operators.SimpleOperator} to the {@code OperatorRouter}.
+ *
+ * @param nextOp The {@link org.apache.samza.sql.api.operators.SimpleOperator} to be added
+ * @throws Exception Throws exception if failed
+ */
+ void addOperator(SimpleOperator nextOp) throws Exception;
+
+ /**
+ * This method gets the list of {@link org.apache.samza.sql.api.operators.SimpleOperator}s attached to an output entity (of any type)
+ *
+ * @param output The identifier of the output entity
+ * @return The list of {@link org.apache.samza.sql.api.operators.SimpleOperator} taking {@code output} as input table/stream
+ */
+ List<SimpleOperator> getNextOperators(EntityName output);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
new file mode 100644
index 0000000..4d670fd
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+
+
+/**
+ * This class defines a generic specification interface class for all {@link org.apache.samza.sql.api.operators.SimpleOperator}s.
+ *
+ * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
+ *
+ * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
+ * in {@link org.apache.samza.sql.api.data.Table} or {@link org.apache.samza.sql.api.data.Stream} name) of input variables , and the list of entity names of the output variables.
+ *
+ */
+public interface OperatorSpec {
+ /**
+ * Interface method that returns the unique ID of the operator in a task
+ *
+ * @return The unique ID of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+ */
+ String getId();
+
+ /**
+ * Access method to the list of entity names of input variables.
+ *
+ * @return A list of entity names of the inputs
+ */
+ List<EntityName> getInputNames();
+
+ /**
+ * Access method to the list of entity name of the output variable
+ *
+ * @return The entity name of the output
+ *
+ */
+ List<EntityName> getOutputNames();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
deleted file mode 100644
index faa0a32..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface <code>RelationOperator</code>.
- *
- * <p>All operators implementing <code>RelationOperator</code> will take a <code>Relation</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All relation algebra operators, such as: join, select, where, group-by, having, limit, order-by, etc.
- * <li>All relation-to-stream operators, which converts a relation to a stream
- * </ul>
- *
- */
-public interface RelationOperator extends Operator {
-
- /**
- * Method to perform a relational algebra on a set of relations, or a relation-to-stream function
- *
- * <p> The actual implementation of relational logic is performed by the implementation of this method.
- * The <code>collector</code> object is used by the operator to send their output to
- *
- * @param deltaRelation The changed rows in the input relation, including the inserts/deletes/updates
- * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
- * @throws Exception Throws exception if failed
- */
- void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
new file mode 100644
index 0000000..c49a822
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.operators;
+
+
+
+/**
+ * The interface for a {@code SimpleOperator} that implements a simple primitive relational logic operation
+ */
+public interface SimpleOperator extends Operator {
+ /**
+ * Method to get the specification of this {@code SimpleOperator}
+ *
+ * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} object that defines the configuration/parameters of the operator
+ */
+ OperatorSpec getSpec();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
index 67671b9..6f8d93b 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
@@ -19,33 +19,19 @@
package org.apache.samza.sql.api.operators;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
/**
- * This class defines the interface of SQL operator factory, which creates the following operators:
- * <ul>
- * <li><code>RelationOperator</code> that takes <code>Relation</code> as input variables
- * <li><code>TupleOperator</code> that takes <code>Tuple</code> as input variables
- * </ul>
- *
+ * This class defines the interface of SQL operator factory, which creates the {@link org.apache.samza.sql.api.operators.SimpleOperator}s:
*/
public interface SqlOperatorFactory {
/**
- * Interface method to create/get the <code>RelationOperator</code> object
- *
- * @param spec The specification of the <code>RelationOperator</code> object
- * @return The relation operator object
- */
- RelationOperator getRelationOperator(OperatorSpec spec);
-
- /**
- * Interface method to create/get the <code>TupleOperator</code> object
+ * Interface method to create/get the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
*
- * @param spec The specification of the <code>TupleOperator</code> object
- * @return The tuple operator object
+ * @param spec The specification of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
+ * @return The {@link org.apache.samza.sql.api.operators.SimpleOperator} object
*/
- TupleOperator getTupleOperator(OperatorSpec spec);
+ SimpleOperator getOperator(OperatorSpec spec);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
deleted file mode 100644
index ac4654e..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators;
-
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines the interface class that processes incoming tuples from input stream(s).
- *
- * <p>All operators implementing <code>TupleOperator</code> will take a <code>Tuple</code> object as input.
- * The SQL operators that need to implement this interface include:
- * <ul>
- * <li>All stream-to-relation operators, such as: window operators.
- * <li>All stream-to-stream operators, such as: re-partition, union of two streams
- * </ul>
- *
- */
-public interface TupleOperator extends Operator {
- /**
- * Interface method to process on an input tuple.
- *
- * @param tuple The input tuple, which has the incoming message from a stream
- * @param collector The <code>SqlMessageCollector</code> object that accepts outputs from the operator
- * @throws Exception Throws exception if failed
- */
- void process(Tuple tuple, SqlMessageCollector collector) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
deleted file mode 100644
index 96385e2..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.operators.spec;
-
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-
-
-/**
- * This class defines a generic specification interface class for all operators.
- *
- * <p>The purpose of this class is to encapsulate all the details of configuration/parameters of a specific implementation of an operator.
- *
- * <p>The generic methods for an operator specification is to provide methods to get the unique ID, the list of entity names (i.e. stream name
- * in <code>Tuple</code> or <code>Relation</code> name) of input variables , and the list of entity names of the output variables.
- *
- */
-public interface OperatorSpec {
- /**
- * Interface method that returns the unique ID of the operator in a task
- *
- * @return The unique ID of the <code>Operator</code> object
- */
- String getId();
-
- /**
- * Access method to the list of entity names of input variables.
- *
- * <p>The input entity names are either stream names if the operator is a <code>TupleOperator</code>;
- * or <code>Relation</code> names if the operator is a <code>RelationOperator</code>
- *
- * @return A list of entity names of the inputs
- */
- List<EntityName> getInputNames();
-
- /**
- * Access method to the list of entity name of the output variable
- *
- * <p>The output entity name is either a stream name if the operator generates tuples as an output stream;
- * or <code>Relation</code> names if the operator generates a <code>Relation</code> as output.
- *
- * @return The entity name of the output
- *
- */
- List<EntityName> getOutputNames();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
deleted file mode 100644
index 2455a62..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/router/OperatorRouter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.router;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-
-
-/**
- * This interface class defines interface methods to connect operators together.
- *
- * <p>The <code>OperatorRouter</code> allows the user to attach operators to a relation or a stream entity,
- * if the corresponding relation/stream is included as inputs to the operator. Each operator then executes its own logic
- * and determines which relation/stream to emit the output to. Through the <code>OperatorRouter</code>, the next
- * operators attached to the corresponding output entities (i.e. relations/streams) can then be invoked to continue the
- * stream process task.
- *
- * <p>The <code>OperatorRouter</code> also allows the user to set the system input entities (i.e. relations/streams)
- * that are fed into the operators by the system outside the <code>OperatorRouter</code>, not generated by some
- * operators in the <code>OperatorRouter</code>.
- *
- * <p>The methods included in this interface class allow a user to
- * <ul>
- * <li>i) add operators to an <code>EntityName</code>
- * <li>ii) get the next operators attached to an <code>EntityName</code>
- * <li>iii) add and get the system input <code>EntityName</code>s
- * <li>iv) iterate through each and every operator connected via <code>OperatorRouter</code>
- * </ul>
- *
- */
-public interface OperatorRouter {
-
- /**
- * This method adds a <code>TupleOperator</code> as one of the input operators.
- *
- * @param stream The output stream entity name
- * @param nextOp The <code>TupleOperator</code> that takes the tuples in the <code>stream</code> as an input.
- * @throws Exception Throws exception if failed
- */
- void addTupleOperator(EntityName stream, TupleOperator nextOp) throws Exception;
-
- /**
- * This method adds a <code>RelationOperator</code> as one of the input operators
-
- * @param relation The input relation entity name
- * @param nextOp The <code>RelationOperator</code> that takes the <code>relation</code> as an input
- * @throws Exception Throws exception if failed
- */
- void addRelationOperator(EntityName relation, RelationOperator nextOp) throws Exception;
-
- /**
- * This method gets the list of <code>RelationOperator</code>s attached to the <code>relation</code>
- *
- * @param relation The identifier of the relation entity
- * @return The list of <code>RelationOperator</code> taking <code>relation</code> as an input variable
- */
- List<RelationOperator> getRelationOperators(EntityName relation);
-
- /**
- * This method gets the list of <code>TupleOperator</code>s attached to the <code>stream</code>
- *
- * @param stream The identifier of the stream entity
- * @return The list of <code>TupleOperator</code> taking <code>stream</code> as an input variable
- */
- List<TupleOperator> getTupleOperators(EntityName stream);
-
- /**
- * This method gets the list of <code>Operator</code>s attached to an output entity (of any type)
- *
- * @param output The identifier of the output entity
- * @return The list of <code>Operator</code> taking <code>output</code> as input variables
- */
- List<Operator> getNextOperators(EntityName output);
-
- /**
- * This method provides an iterator to go through all operators connected via <code>OperatorRouter</code>
- *
- * @return An <code>Iterator</code> for all operators connected via <code>OperatorRouter</code>
- */
- Iterator<Operator> iterator();
-
- /**
- * This method checks to see whether there is any <code>Operator</code> attached to the entity <code>output</code>
- *
- * @param output The output entity name
- * @return True if there is some operator attached to the <code>output</code>; false otherwise
- */
- boolean hasNextOperators(EntityName output);
-
- /**
- * This method adds an entity as the system input
- *
- * @param input The entity name for the system input
- */
- void addSystemInput(EntityName input);
-
- /**
- * This method returns the list of entities as system inputs
- *
- * @return The list of <code>EntityName</code>s as system inputs
- */
- List<EntityName> getSystemInputs();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index f868e5c..72a59f2 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -22,10 +22,12 @@ import org.apache.samza.sql.api.data.Data;
import org.apache.samza.sql.api.data.EntityName;
import org.apache.samza.sql.api.data.Tuple;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.sql.LongOffset;
+import org.apache.samza.system.sql.Offset;
/**
- * This class implements a <code>Tuple</code> class that encapsulates <code>IncomingMessageEnvelope</code> from the system
+ * This class implements a {@link org.apache.samza.sql.api.data.Tuple} that encapsulates an {@link org.apache.samza.system.IncomingMessageEnvelope} from the system
*
*/
public class IncomingMessageTuple implements Tuple {
@@ -40,7 +42,12 @@ public class IncomingMessageTuple implements Tuple {
private final EntityName strmEntity;
/**
- * Ctor to create a <code>IncomingMessageTuple</code> from <code>IncomingMessageEnvelope</code>
+ * The receive time of this incoming message
+ */
+ private final long recvTimeNano;
+
+ /**
+ * Ctor to create a {@code IncomingMessageTuple} from {@link org.apache.samza.system.IncomingMessageEnvelope}
*
* @param imsg The incoming system message
*/
@@ -49,9 +56,9 @@ public class IncomingMessageTuple implements Tuple {
this.strmEntity =
EntityName.getStreamName(String.format("%s:%s", imsg.getSystemStreamPartition().getSystem(), imsg
.getSystemStreamPartition().getStream()));
+ this.recvTimeNano = System.nanoTime();
}
- // TODO: the return type should be changed to the generic data type
@Override
public Data getMessage() {
return (Data) this.imsg.getMessage();
@@ -68,8 +75,20 @@ public class IncomingMessageTuple implements Tuple {
}
@Override
- public EntityName getStreamName() {
+ public EntityName getEntityName() {
return this.strmEntity;
}
+ @Override
+ public long getCreateTimeNano() {
+ // TODO: this is wrong and just to keep as an placeholder. It should be replaced by the message publish time when the publish timestamp is available in the message metadata
+ return this.recvTimeNano;
+ }
+
+ @Override
+ public Offset getOffset() {
+ // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
+ // assuming incoming message carries long value as offset (i.e. Kafka case)
+ return new LongOffset(this.imsg.getOffset());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
new file mode 100644
index 0000000..c3d2266
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public final class NoopOperatorCallback implements OperatorCallback {
+
+ @Override
+ public Tuple beforeProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+ return tuple;
+ }
+
+ @Override
+ public Relation beforeProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+ return rel;
+ }
+
+ @Override
+ public Tuple afterProcess(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) {
+ return tuple;
+ }
+
+ @Override
+ public Relation afterProcess(Relation rel, MessageCollector collector, TaskCoordinator coordinator) {
+ return rel;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
deleted file mode 100644
index c634159..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.factory;
-
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
-
-
-/**
- * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
- *
- */
-public abstract class SimpleOperator implements Operator {
- /**
- * The specification of this operator
- */
- private final OperatorSpec spec;
-
- /**
- * Ctor of <code>SimpleOperator</code> class
- *
- * @param spec The specification of this operator
- */
- public SimpleOperator(OperatorSpec spec) {
- this.spec = spec;
- }
-
- @Override
- public OperatorSpec getSpec() {
- return this.spec;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
index 916b166..cbc84d0 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
@@ -19,16 +19,13 @@
package org.apache.samza.sql.operators.factory;
-import org.apache.samza.sql.api.operators.RelationOperator;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+import org.apache.samza.sql.api.operators.SimpleOperator;
import org.apache.samza.sql.api.operators.SqlOperatorFactory;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.operators.join.StreamStreamJoin;
+import org.apache.samza.sql.operators.join.StreamStreamJoinSpec;
import org.apache.samza.sql.operators.partition.PartitionOp;
import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
import org.apache.samza.sql.operators.window.BoundedTimeWindow;
import org.apache.samza.sql.operators.window.WindowSpec;
@@ -41,23 +38,14 @@ import org.apache.samza.sql.operators.window.WindowSpec;
public class SimpleOperatorFactoryImpl implements SqlOperatorFactory {
@Override
- public RelationOperator getRelationOperator(OperatorSpec spec) {
- if (spec instanceof JoinSpec) {
- return new Join((JoinSpec) spec);
- } else if (spec instanceof InsertStreamSpec) {
- return new InsertStream((InsertStreamSpec) spec);
- }
- throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
- }
-
- @Override
- public TupleOperator getTupleOperator(OperatorSpec spec) {
- if (spec instanceof WindowSpec) {
- return new BoundedTimeWindow((WindowSpec) spec);
- } else if (spec instanceof PartitionSpec) {
+ public SimpleOperator getOperator(OperatorSpec spec) {
+ if (spec instanceof PartitionSpec) {
return new PartitionOp((PartitionSpec) spec);
+ } else if (spec instanceof StreamStreamJoinSpec) {
+ return new StreamStreamJoin((StreamStreamJoinSpec) spec);
+ } else if (spec instanceof WindowSpec) {
+ return new BoundedTimeWindow((WindowSpec) spec);
}
- throw new UnsupportedOperationException("Unsupported operator specified" + spec.getClass().getCanonicalName());
+ throw new UnsupportedOperationException("Unsupported operator specified: " + spec.getClass().getCanonicalName());
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
new file mode 100644
index 0000000..e66451f
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.api.operators.OperatorSpec;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SimpleMessageCollector;
+
+
+/**
+ * An abstract class that encapsulate the basic information and methods that all operator classes should implement.
+ * It implements the interface {@link org.apache.samza.sql.api.operators.SimpleOperator}
+ *
+ */
+public abstract class SimpleOperatorImpl implements SimpleOperator {
+ /**
+ * The specification of this operator
+ */
+ private final OperatorSpec spec;
+
+ /**
+ * The callback function
+ */
+ private final OperatorCallback callback;
+
+ /**
+ * Ctor of {@code SimpleOperatorImpl} class
+ *
+ * @param spec The specification of this operator
+ */
+ public SimpleOperatorImpl(OperatorSpec spec) {
+ this(spec, new NoopOperatorCallback());
+ }
+
+ public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) {
+ this.spec = spec;
+ this.callback = callback;
+ }
+
+ @Override
+ public OperatorSpec getSpec() {
+ return this.spec;
+ }
+
+ /**
+ * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, MessageCollector, TaskCoordinator)}
+ * and real processing of the input relation is fixed.
+ */
+ @Override
+ final public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ Relation rel = this.callback.beforeProcess(deltaRelation, collector, coordinator);
+ if (rel == null) {
+ return;
+ }
+ this.realProcess(rel, getCollector(collector, coordinator), coordinator);
+ }
+
+ /**
+ * This method is made final s.t. the sequence of invocations between {@link org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, MessageCollector, TaskCoordinator)}
+ * and real processing of the input tuple is fixed.
+ */
+ @Override
+ final public void process(Tuple tuple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ Tuple ituple = this.callback.beforeProcess(tuple, collector, coordinator);
+ if (ituple == null) {
+ return;
+ }
+ this.realProcess(ituple, getCollector(collector, coordinator), coordinator);
+ }
+
+ /**
+ * This method is made final s.t. we enforce the invocation of {@code SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before doing anything futher
+ */
+ @Override
+ final public void refresh(long timeNano, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ this.realRefresh(timeNano, getCollector(collector, coordinator), coordinator);
+ }
+
+ private SimpleMessageCollector getCollector(MessageCollector collector, TaskCoordinator coordinator) {
+ if (!(collector instanceof SimpleMessageCollector)) {
+ return new SimpleMessageCollector(collector, coordinator, this.callback);
+ } else {
+ ((SimpleMessageCollector) collector).switchOperatorCallback(this.callback);
+ return (SimpleMessageCollector) collector;
+ }
+ }
+
+ /**
+ * Method to be overriden by each specific implementation class of operator to handle timeout event
+ *
+ * @param timeNano The time in nanosecond when the timeout event occurred
+ * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+ * @throws Exception Throws exception if failed to refresh the results
+ */
+ protected abstract void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception;
+
+ /**
+ * Method to be overriden by each specific implementation class of operator to perform relational logic operation on an input {@link org.apache.samza.sql.api.data.Relation}
+ *
+ * @param rel The input relation
+ * @param collector The {@link org.apache.samza.task.sql.SimpleMessageCollector} in the context
+ * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} in the context
+ * @throws Exception
+ */
+ protected abstract void realProcess(Relation rel, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception;
+
+ protected abstract void realProcess(Tuple ituple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
index 93d4ebb..56753b6 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
@@ -22,12 +22,12 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.api.operators.OperatorSpec;
/**
* An abstract class that encapsulate the basic information and methods that all specification of operators should implement.
- *
+ * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
*/
public abstract class SimpleOperatorSpec implements OperatorSpec {
/**
@@ -46,9 +46,9 @@ public abstract class SimpleOperatorSpec implements OperatorSpec {
private final List<EntityName> outputs = new ArrayList<EntityName>();
/**
- * Ctor of the <code>SimpleOperatorSpec</code> for simple <code>Operator</code>s w/ one input and one output
+ * Ctor of the {@code SimpleOperatorSpec} for simple {@link org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one output
*
- * @param id Unique identifier of the <code>Operator</code> object
+ * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
* @param input The only input entity
* @param output The only output entity
*/
@@ -59,9 +59,9 @@ public abstract class SimpleOperatorSpec implements OperatorSpec {
}
/**
- * Ctor of <code>SimpleOperatorSpec</code> with general format: m inputs and n outputs
+ * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and n outputs
*
- * @param id Unique identifier of the <code>Operator</code> object
+ * @param id Unique identifier of the {@link org.apache.samza.sql.api.operators.SimpleOperator} object
* @param inputs The list of input entities
* @param output The list of output entities
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
new file mode 100644
index 0000000..e570897
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.factory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.Operator;
+import org.apache.samza.sql.api.operators.OperatorRouter;
+import org.apache.samza.sql.api.operators.SimpleOperator;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.RouterMessageCollector;
+
+
+/**
+ * Example implementation of {@link org.apache.samza.sql.api.operators.OperatorRouter}
+ *
+ */
+public final class SimpleRouter implements OperatorRouter {
+ /**
+ * List of operators added to the {@link org.apache.samza.sql.api.operators.OperatorRouter}
+ */
+ private List<SimpleOperator> operators = new ArrayList<SimpleOperator>();
+
+ @SuppressWarnings("rawtypes")
+ /**
+ * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list of operators associated with it
+ */
+ private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
+
+ /**
+ * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to this {@code SimpleRouter}
+ */
+ private Set<EntityName> inputEntities = new HashSet<EntityName>();
+
+ /**
+ * Set of entities that are not input entities to this {@code SimpleRouter}
+ */
+ private Set<EntityName> outputEntities = new HashSet<EntityName>();
+
+ @SuppressWarnings("unchecked")
+ private void addOperator(EntityName input, SimpleOperator nextOp) {
+ if (nextOps.get(input) == null) {
+ nextOps.put(input, new ArrayList<Operator>());
+ }
+ nextOps.get(input).add(nextOp);
+ operators.add(nextOp);
+ // get the operator spec
+ for (EntityName output : nextOp.getSpec().getOutputNames()) {
+ if (inputEntities.contains(output)) {
+ inputEntities.remove(output);
+ }
+ outputEntities.add(output);
+ }
+ if (!outputEntities.contains(input)) {
+ inputEntities.add(input);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<SimpleOperator> getNextOperators(EntityName entity) {
+ return nextOps.get(entity);
+ }
+
+ @Override
+ public void addOperator(SimpleOperator nextOp) {
+ List<EntityName> inputs = nextOp.getSpec().getInputNames();
+ for (EntityName input : inputs) {
+ addOperator(input, nextOp);
+ }
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) throws Exception {
+ for (SimpleOperator op : this.operators) {
+ op.init(config, context);
+ }
+ }
+
+ @Override
+ public void process(Tuple ituple, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+ for (Iterator<SimpleOperator> iter = this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
+ iter.next().process(ituple, opCollector, coordinator);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void process(Relation deltaRelation, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+ for (Iterator<SimpleOperator> iter = this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) {
+ iter.next().process(deltaRelation, opCollector, coordinator);
+ }
+ }
+
+ @Override
+ public void refresh(long nanoSec, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ MessageCollector opCollector = new RouterMessageCollector(collector, coordinator, this);
+ for (EntityName entity : inputEntities) {
+ for (Iterator<SimpleOperator> iter = this.getNextOperators(entity).iterator(); iter.hasNext();) {
+ iter.next().refresh(nanoSec, opCollector, coordinator);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
new file mode 100644
index 0000000..2854aeb
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.join;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.api.data.Relation;
+import org.apache.samza.sql.api.data.Stream;
+import org.apache.samza.sql.api.data.Tuple;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
+import org.apache.samza.sql.operators.window.BoundedTimeWindow;
+import org.apache.samza.sql.window.storage.OrderedStoreKey;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.sql.SimpleMessageCollector;
+
+
+/**
+ * This class implements a simple stream-to-stream join
+ */
+public class StreamStreamJoin extends SimpleOperatorImpl {
+ private final StreamStreamJoinSpec spec;
+
+ private Map<EntityName, BoundedTimeWindow> inputWindows = new HashMap<EntityName, BoundedTimeWindow>();
+
+ public StreamStreamJoin(StreamStreamJoinSpec spec) {
+ super(spec);
+ this.spec = spec;
+ }
+
+ //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec.
+ public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys) {
+ this(null);
+ }
+
+ //TODO: stub constructor to allow compilation pass. Need to construct real StreamStreamJoinSpec.
+ public StreamStreamJoin(String opId, List<String> inputRelations, String output, List<String> joinKeys,
+ OperatorCallback callback) {
+ super(null, callback);
+ this.spec = null;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) throws Exception {
+ // TODO Auto-generated method stub
+ // initialize the inputWindows map
+
+ }
+
+ private void join(Tuple tuple, Map<EntityName, Stream> joinSets) {
+ // TODO Auto-generated method stub
+ // Do M-way joins if necessary, it should be ordered based on the orders of the input relations in inputs
+ // NOTE: inner joins may be optimized by re-order the input relations by joining inputs w/ less join sets first. We will consider it later.
+
+ }
+
+ private Map<EntityName, Stream> findJoinSets(Tuple tuple) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private KeyValueIterator<OrderedStoreKey, Tuple> getJoinSet(Tuple tuple, EntityName strmName) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private List<Entry<String, Object>> getEqualFields(Tuple tuple, EntityName strmName) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ // TODO Auto-generated method stub
+ Map<EntityName, Stream> joinSets = findJoinSets(tuple);
+ join(tuple, joinSets);
+ }
+
+ @Override
+ public void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
new file mode 100644
index 0000000..cc0aca0
--- /dev/null
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.operators.join;
+
+import java.util.List;
+
+import org.apache.samza.sql.api.data.EntityName;
+import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
+
+
+/**
+ * This class defines the specification of a {@link org.apache.samza.sql.operators.join.StreamStreamJoin} operator
+ */
+public class StreamStreamJoinSpec extends SimpleOperatorSpec {
+
+ public StreamStreamJoinSpec(String id, List<EntityName> inputs, EntityName output, List<String> joinKeys) {
+ super(id, inputs, output);
+ // TODO Auto-generated constructor stub
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 986d688..b93d789 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -20,31 +20,33 @@
package org.apache.samza.sql.operators.partition;
import org.apache.samza.config.Config;
+import org.apache.samza.sql.api.data.Relation;
import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
+import org.apache.samza.sql.api.operators.OperatorCallback;
+import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
+import org.apache.samza.task.sql.SimpleMessageCollector;
/**
* This is an example build-in operator that performs a simple stream re-partition operation.
*
*/
-public final class PartitionOp extends SimpleOperator implements TupleOperator {
+public class PartitionOp extends SimpleOperatorImpl {
/**
- * The specification of this <code>PartitionOp</code>
+ * The specification of this {@code PartitionOp}
*
*/
private final PartitionSpec spec;
/**
- * Ctor that takes the <code>PartitionSpec</code> object as input.
+ * Ctor that takes the {@link org.apache.samza.sql.operators.partition.PartitionSpec} object as input.
*
* @param spec The <code>PartitionSpec</code> object
*/
@@ -64,7 +66,23 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
* @param parNum The number of partitions used for the output stream
*/
public PartitionOp(String id, String input, String system, String output, String parKey, int parNum) {
- super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
+ this(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum));
+ }
+
+ /**
+ * A simplified constructor that allow users to randomly create <code>PartitionOp</code>
+ *
+ * @param id The identifier of this operator
+ * @param input The input stream name of this operator
+ * @param system The output system name of this operator
+ * @param output The output stream name of this operator
+ * @param parKey The partition key used for the output stream
+ * @param parNum The number of partitions used for the output stream
+ * @param callback The callback functions for operator
+ */
+ public PartitionOp(String id, String input, String system, String output, String parKey, int parNum,
+ OperatorCallback callback) {
+ super(new PartitionSpec(id, input, new SystemStream(system, output), parKey, parNum), callback);
this.spec = (PartitionSpec) super.getSpec();
}
@@ -75,15 +93,28 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
}
@Override
- public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ protected void realRefresh(long timeNano, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
// TODO Auto-generated method stub
// NOOP or flush
}
@Override
- public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
- collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
- tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
+ protected void realProcess(Tuple tuple, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getMessage()
+ .getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getKey().value(), tuple.getMessage().value()));
+ }
+
+ @Override
+ protected void realProcess(Relation deltaRelation, SimpleMessageCollector collector, TaskCoordinator coordinator)
+ throws Exception {
+ for(KeyValueIterator<?, Tuple> iter = deltaRelation.all(); iter.hasNext(); ) {
+ Entry<?, Tuple> entry = iter.next();
+ collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), entry.getValue().getMessage()
+ .getFieldData(PartitionOp.this.spec.getParKey()).value(), entry.getValue().getKey().value(), entry.getValue()
+ .getMessage().value()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
index 29d1784..c47eed9 100644
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
+++ b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
@@ -20,13 +20,13 @@
package org.apache.samza.sql.operators.partition;
import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.spec.OperatorSpec;
+import org.apache.samza.sql.api.operators.OperatorSpec;
import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
import org.apache.samza.system.SystemStream;
/**
- * This class defines the specification class of <code>PartitionOp</code> operator
+ * This class defines the specification class of {@link org.apache.samza.sql.operators.partition.PartitionOp}
*
*/
public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
@@ -47,11 +47,11 @@ public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
private final SystemStream sysStream;
/**
- * Ctor to create the <code>PartitionSpec</code>
+ * Ctor to create the {@code PartitionSpec}
*
- * @param id The ID of the <code>PartitionOp</code>
+ * @param id The ID of the {@link org.apache.samza.sql.operators.partition.PartitionOp}
* @param input The input stream name
- * @param output The output <code>SystemStream</code> object
+ * @param output The output {@link org.apache.samza.system.SystemStream} object
* @param parKey The name of the partition key
* @param parNum The number of partitions
*/
@@ -81,7 +81,7 @@ public class PartitionSpec extends SimpleOperatorSpec implements OperatorSpec {
}
/**
- * Method to get the output <code>SystemStream</code>
+ * Method to get the output {@link org.apache.samza.system.SystemStream}
*
* @return The output system stream object
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/41c4cd01/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
----------------------------------------------------------------------
diff --git a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
deleted file mode 100644
index a8a6eaf..0000000
--- a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/relation/Join.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.operators.relation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.operators.RelationOperator;
-import org.apache.samza.sql.operators.factory.SimpleOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.sql.SqlMessageCollector;
-
-
-/**
- * This class defines an example build-in operator for a join operator between two relations.
- *
- */
-public class Join extends SimpleOperator implements RelationOperator {
-
- private final JoinSpec spec;
-
- /**
- * The input relations
- *
- */
- private List<Relation> inputs = null;
-
- /**
- * The output relation
- */
- private Relation output = null;
-
- /**
- * Ctor that creates <code>Join</code> operator based on the specification.
- *
- * @param spec The <code>JoinSpec</code> object that specifies the join operator
- */
- public Join(JoinSpec spec) {
- super(spec);
- this.spec = spec;
- }
-
- /**
- * An alternative ctor that allows users to create a join operator randomly.
- *
- * @param id The identifier of the join operator
- * @param joinIns The list of input relation names of the join
- * @param joinOut The output relation name of the join
- * @param joinKeys The list of keys used in the join. Each entry in the <code>joinKeys</code> is the key name used in one of the input relations.
- * The order of the <code>joinKeys</code> MUST be the same as their corresponding relation names in <code>joinIns</code>
- */
- @SuppressWarnings("serial")
- public Join(final String id, final List<String> joinIns, final String joinOut, final List<String> joinKeys) {
- super(new JoinSpec(id, new ArrayList<EntityName>() {
- {
- for (String name : joinIns) {
- add(EntityName.getRelationName(name));
- }
- }
- }, EntityName.getRelationName(joinOut), joinKeys));
- this.spec = (JoinSpec) this.getSpec();
- }
-
- private boolean hasPendingChanges() {
- return getPendingChanges() != null;
- }
-
- private Relation getPendingChanges() {
- // TODO Auto-generated method stub
- // return any pending changes that have not been processed yet
- return null;
- }
-
- private Relation getOutputChanges() {
- // TODO Auto-generated method stub
- return null;
- }
-
- private boolean hasOutputChanges() {
- // TODO Auto-generated method stub
- return getOutputChanges() != null;
- }
-
- private void join(Relation deltaRelation) {
- // TODO Auto-generated method stub
- // implement the join logic
- // 1. calculate the delta changes in <code>output</code>
- // 2. check output condition to see whether the current input should trigger an output
- // 3. set the output changes and pending changes
- }
-
- @Override
- public void init(Config config, TaskContext context) throws Exception {
- for (EntityName relation : this.spec.getInputNames()) {
- inputs.add((Relation) context.getStore(relation.toString()));
- }
- this.output = (Relation) context.getStore(this.spec.getOutputName().toString());
- }
-
- @Override
- public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- SqlMessageCollector sqlCollector = (SqlMessageCollector) collector;
- if (hasPendingChanges()) {
- sqlCollector.send(getPendingChanges());
- }
- sqlCollector.timeout(this.spec.getOutputNames());
- }
-
- @Override
- public void process(Relation deltaRelation, SqlMessageCollector collector) throws Exception {
- // calculate join based on the input <code>deltaRelation</code>
- join(deltaRelation);
- if (hasOutputChanges()) {
- collector.send(getOutputChanges());
- }
- }
-}