You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/12 23:10:51 UTC

[2/3] samza git commit: Revert "SAMZA-482; create samza-sql module, and add a basic set of non-functional operators into it"

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java b/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
deleted file mode 100644
index b4b0e59..0000000
--- a/samza-sql/src/main/java/org/apache/samza/task/sql/StoreMessageCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-
-
-/**
- * Example implementation of <code>SqlMessageCollector</code> that stores outputs from the operators
- *
- */
-public class StoreMessageCollector implements SqlMessageCollector {
-
-  private final KeyValueStore<EntityName, List<Object>> outputStore;
-
-  public StoreMessageCollector(KeyValueStore<EntityName, List<Object>> store) {
-    this.outputStore = store;
-  }
-
-  @Override
-  public void send(Relation deltaRelation) throws Exception {
-    saveOutput(deltaRelation.getName(), deltaRelation);
-  }
-
-  @Override
-  public void send(Tuple tuple) throws Exception {
-    saveOutput(tuple.getStreamName(), tuple);
-  }
-
-  @Override
-  public void timeout(List<EntityName> outputs) throws Exception {
-    // TODO Auto-generated method stub
-  }
-
-  public List<Object> removeOutput(EntityName id) {
-    List<Object> output = outputStore.get(id);
-    outputStore.delete(id);
-    return output;
-  }
-
-  private void saveOutput(EntityName name, Object output) {
-    if (this.outputStore.get(name) == null) {
-      this.outputStore.put(name, new ArrayList<Object>());
-    }
-    List<Object> outputs = this.outputStore.get(name);
-    outputs.add(output);
-  }
-
-  @Override
-  public void send(OutgoingMessageEnvelope envelope) {
-    saveOutput(
-        EntityName.getStreamName(envelope.getSystemStream().getSystem() + ":" + envelope.getSystemStream().getStream()),
-        envelope);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
deleted file mode 100644
index 4ec7dbb..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/RandomOperatorTask.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.data.Relation;
-import org.apache.samza.sql.api.data.Tuple;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the following operations:
- * <p>a. the two streams are each processed by a window operator to convert to relations
- * <p>b. a join operator is applied on the two relations to generate join results
- * <p>c. finally, the join results are sent out to the system output
- *
- */
-public class RandomOperatorTask implements StreamTask, InitableTask, WindowableTask {
-  private KeyValueStore<EntityName, List<Object>> opOutputStore;
-  private BoundedTimeWindow wndOp1;
-  private BoundedTimeWindow wndOp2;
-  private Join joinOp;
-
-  private BoundedTimeWindow getWindowOp(EntityName streamName) {
-    if (streamName.equals(EntityName.getStreamName("kafka:stream1"))) {
-      return this.wndOp1;
-    } else if (streamName.equals(EntityName.getStreamName("kafka:stream2"))) {
-      return this.wndOp2;
-    }
-
-    throw new IllegalArgumentException("No window operator found for stream: " + streamName);
-  }
-
-  private void processJoinOutput(List<Object> outputs, MessageCollector collector) {
-    // get each tuple in the join operator's outputs and send it to system stream
-    for (Object joinOutput : outputs) {
-      for (KeyValueIterator<Object, Tuple> iter = ((Relation) joinOutput).all(); iter.hasNext();) {
-        Tuple otuple = iter.next().getValue();
-        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "joinOutput1"), otuple.getKey(), otuple
-            .getMessage()));
-      }
-    }
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // construct the input tuple
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-
-    // based on tuple's stream name, get the window op and run process()
-    BoundedTimeWindow wndOp = getWindowOp(ituple.getStreamName());
-    wndOp.process(ituple, sqlCollector);
-    List<Object> wndOutputs = sqlCollector.removeOutput(wndOp.getSpec().getOutputNames().get(0));
-    if (wndOutputs.isEmpty()) {
-      return;
-    }
-
-    // process all output from the window operator
-    for (Object input : wndOutputs) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    // create the StoreMessageCollector
-    StoreMessageCollector sqlCollector = new StoreMessageCollector(this.opOutputStore);
-
-    // trigger timeout event on both window operators
-    this.wndOp1.window(sqlCollector, coordinator);
-    this.wndOp2.window(sqlCollector, coordinator);
-
-    // for all outputs from the window operators, call joinOp.process()
-    for (Object input : sqlCollector.removeOutput(this.wndOp1.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-    for (Object input : sqlCollector.removeOutput(this.wndOp2.getSpec().getOutputNames().get(0))) {
-      Relation relation = (Relation) input;
-      this.joinOp.process(relation, sqlCollector);
-    }
-
-    // get the output from the join operator and send them
-    processJoinOutput(sqlCollector.removeOutput(this.joinOp.getSpec().getOutputNames().get(0)), collector);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // 1. create a fixed length 10 sec window operator
-    this.wndOp1 = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", "relation1");
-    this.wndOp2 = new BoundedTimeWindow("wndOp2", 10, "kafka:stream2", "relation2");
-    // 2. create a join operation
-    List<String> inputRelations = new ArrayList<String>();
-    inputRelations.add("relation1");
-    inputRelations.add("relation2");
-    List<String> joinKeys = new ArrayList<String>();
-    joinKeys.add("key1");
-    joinKeys.add("key2");
-    this.joinOp = new Join("joinOp", inputRelations, "joinOutput", joinKeys);
-    // Finally, initialize all operators
-    this.opOutputStore =
-        (KeyValueStore<EntityName, List<Object>>) context.getStore("samza-sql-operator-output-kvstore");
-    this.wndOp1.init(config, context);
-    this.wndOp2.init(config, context);
-    this.joinOp.init(config, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java b/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
deleted file mode 100644
index 4796fa6..0000000
--- a/samza-sql/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task.sql;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.sql.api.data.EntityName;
-import org.apache.samza.sql.api.operators.Operator;
-import org.apache.samza.sql.api.operators.TupleOperator;
-import org.apache.samza.sql.api.router.OperatorRouter;
-import org.apache.samza.sql.data.IncomingMessageTuple;
-import org.apache.samza.sql.operators.factory.SimpleOperatorFactoryImpl;
-import org.apache.samza.sql.operators.partition.PartitionOp;
-import org.apache.samza.sql.operators.partition.PartitionSpec;
-import org.apache.samza.sql.operators.relation.Join;
-import org.apache.samza.sql.operators.relation.JoinSpec;
-import org.apache.samza.sql.operators.stream.InsertStream;
-import org.apache.samza.sql.operators.stream.InsertStreamSpec;
-import org.apache.samza.sql.operators.window.BoundedTimeWindow;
-import org.apache.samza.sql.operators.window.WindowSpec;
-import org.apache.samza.sql.router.SimpleRouter;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-
-/***
- * This example illustrate a SQL join operation that joins two streams together using the folowing operations:
- * <ul>
- * <li>a. the two streams are each processed by a window operator to convert to relations
- * <li>b. a join operator is applied on the two relations to generate join results
- * <li>c. an istream operator is applied on join output and convert the relation into a stream
- * <li>d. a partition operator that re-partitions the output stream from istream and send the stream to system output
- * </ul>
- *
- * This example also uses an implementation of <code>SqlMessageCollector</code> (@see <code>OperatorMessageCollector</code>)
- * that uses <code>OperatorRouter</code> to automatically execute the whole paths that connects operators together.
- */
-public class StreamSqlTask implements StreamTask, InitableTask, WindowableTask {
-
-  private OperatorRouter rteCntx;
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
-      throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    IncomingMessageTuple ituple = new IncomingMessageTuple(envelope);
-    for (Iterator<TupleOperator> iter = this.rteCntx.getTupleOperators(ituple.getStreamName()).iterator(); iter
-        .hasNext();) {
-      iter.next().process(ituple, opCollector);
-    }
-
-  }
-
-  @Override
-  public void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    SqlMessageCollector opCollector = new OperatorMessageCollector(collector, coordinator, this.rteCntx);
-
-    for (EntityName entity : this.rteCntx.getSystemInputs()) {
-      for (Iterator<Operator> iter = this.rteCntx.getNextOperators(entity).iterator(); iter.hasNext();) {
-        iter.next().window(opCollector, coordinator);
-      }
-    }
-
-  }
-
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    // create specification of all operators first
-    // 1. create 2 window specifications that define 2 windows of fixed length of 10 seconds
-    final WindowSpec spec1 =
-        new WindowSpec("fixedWnd1", EntityName.getStreamName("inputStream1"),
-            EntityName.getRelationName("fixedWndOutput1"), 10);
-    final WindowSpec spec2 =
-        new WindowSpec("fixedWnd2", EntityName.getStreamName("inputStream2"),
-            EntityName.getRelationName("fixedWndOutput2"), 10);
-    // 2. create a join specification that join the output from 2 window operators together
-    @SuppressWarnings("serial")
-    List<EntityName> inputRelations = new ArrayList<EntityName>() {
-      {
-        add(spec1.getOutputName());
-        add(spec2.getOutputName());
-      }
-    };
-    @SuppressWarnings("serial")
-    List<String> joinKeys = new ArrayList<String>() {
-      {
-        add("key1");
-        add("key2");
-      }
-    };
-    JoinSpec joinSpec = new JoinSpec("joinOp", inputRelations, EntityName.getRelationName("joinOutput"), joinKeys);
-    // 3. create the specification of an istream operator that convert the output from join to a stream
-    InsertStreamSpec istrmSpec =
-        new InsertStreamSpec("istremOp", joinSpec.getOutputName(), EntityName.getStreamName("istrmOutput1"));
-    // 4. create the specification of a partition operator that re-partitions the stream based on <code>joinKey</code>
-    PartitionSpec parSpec =
-        new PartitionSpec("parOp1", istrmSpec.getOutputName().getName(), new SystemStream("kafka", "parOutputStrm1"),
-            "joinKey", 50);
-
-    // create all operators via the operator factory
-    // 1. create two window operators
-    SimpleOperatorFactoryImpl operatorFactory = new SimpleOperatorFactoryImpl();
-    BoundedTimeWindow wnd1 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec1);
-    BoundedTimeWindow wnd2 = (BoundedTimeWindow) operatorFactory.getTupleOperator(spec2);
-    // 2. create one join operator
-    Join join = (Join) operatorFactory.getRelationOperator(joinSpec);
-    // 3. create one stream operator
-    InsertStream istream = (InsertStream) operatorFactory.getRelationOperator(istrmSpec);
-    // 4. create a re-partition operator
-    PartitionOp par = (PartitionOp) operatorFactory.getTupleOperator(parSpec);
-
-    // Now, connecting the operators via the OperatorRouter
-    this.rteCntx = new SimpleRouter();
-    // 1. set two system input operators (i.e. two window operators)
-    this.rteCntx.addTupleOperator(spec1.getInputName(), wnd1);
-    this.rteCntx.addTupleOperator(spec2.getInputName(), wnd2);
-    // 2. connect join operator to both window operators
-    this.rteCntx.addRelationOperator(spec1.getOutputName(), join);
-    this.rteCntx.addRelationOperator(spec2.getOutputName(), join);
-    // 3. connect stream operator to the join operator
-    this.rteCntx.addRelationOperator(joinSpec.getOutputName(), istream);
-    // 4. connect re-partition operator to the stream operator
-    this.rteCntx.addTupleOperator(istrmSpec.getOutputName(), par);
-    // 5. set the system inputs
-    this.rteCntx.addSystemInput(spec1.getInputName());
-    this.rteCntx.addSystemInput(spec2.getInputName());
-
-    for (Iterator<Operator> iter = this.rteCntx.iterator(); iter.hasNext();) {
-      iter.next().init(config, context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6743df31/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 08e548c..bb07a3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -26,8 +26,7 @@ include \
   'samza-log4j',
   'samza-shell',
   'samza-yarn',
-  'samza-test',
-  'samza-sql'
+  'samza-test'
 
 rootProject.children.each {
   if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {