You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/12 23:10:51 UTC
[2/3] samza git commit: Revert "SAMZA-482;
create samza-sql module, and add a basic set of non-functional
operators into it"
http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
deleted file mode 100644
index b4b0e59..0000000
--- a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-
-
-/**
- * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators
- *
- */
-public class StoreMessageCollector implements SqlMessageCollector {
-
- private final KeyValueStore<EntityName, List<Object>> outputStore;
-
- public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
- this.outputStore = store;
- }
-
- @Override
- public void send(Relation deltaRelation) throws Exception {
- saveOutput(deltaRelation.getName(), deltaRelation);
- }
-
- @Override
- public void send(Tuple tuple) throws Exception {
- saveOutput(tuple.getStreamName(), tuple);
- }
-
- @Override
- public void timeout(List<EntityName> outputs) throws Exception {
- // TODO Auto-generated method stub
- }
-
- public List<Object> removeOutput(EntityName id) {
- List<Object> output = outputStore.get(id);
- outputStore.delete(id);
- return output;
- }
-
- private void saveOutput(EntityName name, Object output) {
- if (this.outputStore.get(name) == null) {
- this.outputStore.put(name, new ArrayList<Object>());
- }
- List<Object> outputs = this.outputStore.get(name);
- outputs.add(output);
- }
-
- @Override
- public void send(OutgoingMessageEnvelope envelope) {
- saveOutput(
- EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()),
- envelope);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
deleted file mode 100644
index 4ec7dbb..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the following operations:
- * <p>a. the two streams are each processed by a window operator to convert to relations
- * <p>b. a join operator is applied on the two relations to generate join results
- * <p>c. finally, the join results are sent out to the system output
- *
- */
-public class RandomOperatorTask implements StreamTask, InitableTask, WindowableTask {
- private KeyValueStore<EntityName, List<Object>> opOutputStore;
- private BoundedTimeWindow wndOp1;
- private BoundedTimeWindow wndOp2;
- private Join joinOp;
-
- private BoundedTimeWindow getWindowOp(EntityName streamName) {
- if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) {
- return this.wndOp1;
- } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) {
- return this.wndOp2;
- }
-
- throw new IllegalArgumentException("No window operator found for stream: " + streamName);
- }
-
- private void processJoinOutput(List<Object> outputs, MessageCollector collector) {
- // get each tuple in the join operator's outputs and send it to system stream
- for (Object joinOutput : outputs) {
- for (KeyValueIterator<Object, Tuple> iter = ((Relation) joinOutput).all(); iter.hasNext();) {
- Tuple otuple = iter.next().getValue();
- collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "joinOutput1"), otuple.getKey(), otuple
- .getMessage()));
- }
- }
- }
-
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
- throws Exception {
- // create the StoreMessageCollector
- StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
- // construct the input tuple
- IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-
- // based on tuple's stream name, get the window op and run process()
- BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName());
- wndOp.process(ituple, sqlCollector);
- List<Object> wndOutputs = sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0));
- if (wndOutputs.isEmpty()) {
- return;
- }
-
- // process all output from the window operator
- for (Object input : wndOutputs) {
- Relation relation = (Relation) input;
- this.joinOp.process(relation, sqlCollector);
- }
- // get the output from the join operator and send them
- processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-
- }
-
- @Override
- public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- // create the StoreMessageCollector
- StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
- // trigger timeout event on both window operators
- this.wndOp1.window(sqlCollector, coordinator);
- this.wndOp2.window(sqlCollector, coordinator);
-
- // for all outputs from the window operators, call joinOp.process()
- for (Object input : sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) {
- Relation relation = (Relation) input;
- this.joinOp.process(relation, sqlCollector);
- }
- for (Object input : sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) {
- Relation relation = (Relation) input;
- this.joinOp.process(relation, sqlCollector);
- }
-
- // get the output from the join operator and send them
- processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(Config config, TaskContext context) throws Exception {
- // 1. create a fixed length 10 sec window operator
- this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1");
- this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", "relation2");
- // 2. create a join operation
- List<String> inputRelations = new ArrayList<String>();
- inputRelations.add("relation1");
- inputRelations.add("relation2");
- List<String> joinKeys = new ArrayList<String>();
- joinKeys.add("key1");
- joinKeys.add("key2");
- this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys);
- // Finally, initialize all operators
- this.opOutputStore =
- (KeyValueStore<EntityName, List<Object>>) context.getStore("samza-sql-operator-output-kvstore");
- this.wndOp1.init(config, context);
- this.wndOp2.init(config, context);
- this.joinOp.init(config, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
deleted file mode 100644
index 4796fa6..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-import org.apache.samza.sql.router.SimpleRouter;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the folowing operations:
- * <ul>
- * <li>a. the two streams are each processed by a window operator to convert to relations
- * <li>b. a join operator is applied on the two relations to generate join results
- * <li>c. an istream operator is applied on join output and convert the relation into a stream
- * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output
- * </ul>
- *
- * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
- * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together.
- */
-public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
-
- private OperatorRouter rteCntx;
-
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
- throws Exception {
- SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
- IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
- for (Iterator<TupleOperator> iter = this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter
- .hasNext();) {
- iter.next().process(ituple, opCollector);
- }
-
- }
-
- @Override
- public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
- for (EntityName entity : this.rteCntx.getSystemInputs()) {
- for (Iterator<Operator> iter = this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) {
- iter.next().window(opCollector, coordinator);
- }
- }
-
- }
-
- @Override
- public void init(Config config, TaskContext context) throws Exception {
- // create specification of all operators first
- // 1. create 2 window specifications that define 2 windows of fixed length of 10 seconds
- final WindowSpec spec1 =
- new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"),
- EntityName.getRelationName("fixedWndOutput1"), 10);
- final WindowSpec spec2 =
- new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"),
- EntityName.getRelationName("fixedWndOutput2"), 10);
- // 2. create a join specification that join the output from 2 window operators together
- @SuppressWarnings("serial")
- List<EntityName> inputRelations = new ArrayList<EntityName>() {
- {
- add(spec1.getOutputName());
- add(spec2.getOutputName());
- }
- };
- @SuppressWarnings("serial")
- List<String> joinKeys = new ArrayList<String>() {
- {
- add("key1");
- add("key2");
- }
- };
- JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, EntityName.getRelationName("joinOutput"), joinKeys);
- // 3. create the specification of an istream operator that convert the output from join to a stream
- InsertStreamSpec istrmSpec =
- new InsertStreamSpec("istremOp", joinSpec.getOutputName(), EntityName.getStreamName("istrmOutput1"));
- // 4. create the specification of a partition operator that re-partitions the stream based on <code>joinKey</code>
- PartitionSpec parSpec =
- new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new SystemStream("kafka", "parOutputStrm1"),
- "joinKey", 50);
-
- // create all operators via the operator factory
- // 1. create two window operators
- SimpleOperatorFactoryImpl operatorFactory = new SimpleOperatorFactoryImpl();
- BoundedTimeWindow wnd1 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec1);
- BoundedTimeWindow wnd2 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec2);
- // 2. create one join operator
- Join join = (Join) operatorFactory.getRelationOperator(joinSpec);
- // 3. create one stream operator
- InsertStream istream = (InsertStream) operatorFactory.getRelationOperator(istrmSpec);
- // 4. create a re-partition operator
- PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec);
-
- // Now, connecting the operators via the OperatorRouter
- this.rteCntx = new SimpleRouter();
- // 1. set two system input operators (i.e. two window operators)
- this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1);
- this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2);
- // 2. connect join operator to both window operators
- this.rteCntx.addRelationOperator(spec1.getOutputName(), join);
- this.rteCntx.addRelationOperator(spec2.getOutputName(), join);
- // 3. connect stream operator to the join operator
- this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream);
- // 4. connect re-partition operator to the stream operator
- this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par);
- // 5. set the system inputs
- this.rteCntx.addSystemInput(spec1.getInputName());
- this.rteCntx.addSystemInput(spec2.getInputName());
-
- for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) {
- iter.next().init(config, context);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 08e548c..bb07a3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -26,8 +26,7 @@ include \
'samza-log4j',
'samza-shell',
'samza-yarn',
- 'samza-test',
- 'samza-sql'
+ 'samza-test'
rootProject.children.each {
if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {