You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/21 00:10:54 UTC

[1/3] storm git commit: STORM-1221. Create a common interface for all Trident spout.

Repository: storm
Updated Branches:
  refs/heads/master 7fb80a1d1 -> a8d253a9b


STORM-1221. Create a common interface for all Trident spout.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20374cef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20374cef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20374cef

Branch: refs/heads/master
Commit: 20374cef0b48c71cdbab687e2463528dba0055cb
Parents: ad0c4ef
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Nov 19 14:28:55 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Nov 20 13:18:20 2015 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/trident/TridentTopology.java  | 17 ++++++++++++-
 .../jvm/storm/trident/spout/IBatchSpout.java    |  2 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |  3 ++-
 .../trident/spout/IPartitionedTridentSpout.java |  2 +-
 .../storm/trident/spout/ITridentDataSource.java | 26 ++++++++++++++++++++
 .../jvm/storm/trident/spout/ITridentSpout.java  |  2 +-
 6 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/TridentTopology.java b/storm-core/src/jvm/storm/trident/TridentTopology.java
index 1d9c867..5dfac1c 100644
--- a/storm-core/src/jvm/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/storm/trident/TridentTopology.java
@@ -63,6 +63,7 @@ import storm.trident.spout.BatchSpoutExecutor;
 import storm.trident.spout.IBatchSpout;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
+import storm.trident.spout.ITridentDataSource;
 import storm.trident.spout.ITridentSpout;
 import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
 import storm.trident.spout.PartitionedTridentSpoutExecutor;
@@ -127,7 +128,21 @@ public class TridentTopology {
     public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
         return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
     }
-    
+
+    public Stream newStream(String txId, ITridentDataSource dataSource) {
+        if (dataSource instanceof IBatchSpout) {
+            return newStream(txId, (IBatchSpout) dataSource);
+        } else if (dataSource instanceof ITridentSpout) {
+            return newStream(txId, (ITridentSpout) dataSource);
+        } else if (dataSource instanceof IPartitionedTridentSpout) {
+            return newStream(txId, (IPartitionedTridentSpout) dataSource);
+        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
+            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
+        } else {
+            throw new UnsupportedOperationException("Unsupported stream");
+        }
+    }
+
     public Stream newDRPCStream(String function) {
         return newDRPCStream(new DRPCSpout(function));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java b/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java
index d6b0350..9e3a755 100644
--- a/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.Map;
 import storm.trident.operation.TridentCollector;
 
-public interface IBatchSpout extends Serializable {
+public interface IBatchSpout extends ITridentDataSource {
     void open(Map conf, TopologyContext context);
     void emitBatch(long batchId, TridentCollector collector);
     void ack(long batchId);

http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index c3d19ef..9a84113 100644
--- a/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -30,7 +30,8 @@ import storm.trident.topology.TransactionAttempt;
  * replay the same batch every time it emits a batch for a transaction id.
  * 
  */
-public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M> extends Serializable {
+public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M>
+    extends ITridentDataSource {
     public interface Coordinator<Partitions> {
         boolean isReady(long txid);
         Partitions getPartitionsForBatch();

http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java b/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
index 56a83bb..21b1d73 100644
--- a/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
@@ -30,7 +30,7 @@ import storm.trident.topology.TransactionAttempt;
  * brokers. It automates the storing of metadata for each partition to ensure that the same batch
  * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
  */
-public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, T> extends Serializable {
+public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, T> extends ITridentDataSource {
     public interface Coordinator<Partitions> {
         /**
          * Return the partitions currently in the source of data. The idea is

http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/spout/ITridentDataSource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/ITridentDataSource.java b/storm-core/src/jvm/storm/trident/spout/ITridentDataSource.java
new file mode 100644
index 0000000..aa2335b
--- /dev/null
+++ b/storm-core/src/jvm/storm/trident/spout/ITridentDataSource.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.trident.spout;
+
+import java.io.Serializable;
+
+/**
+ * ITridentDataSource marks all spouts that provide data into Trident.
+ */
+public interface ITridentDataSource extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/20374cef/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
index 15e1a5a..3037298 100644
--- a/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
+++ b/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import storm.trident.operation.TridentCollector;
 
 
-public interface ITridentSpout<T> extends Serializable {
+public interface ITridentSpout<T> extends ITridentDataSource {
     interface BatchCoordinator<X> {
         /**
          * Create metadata for this particular transaction id which has never


[2/3] storm git commit: Merge branch 'STORM-1221' of https://github.com/haohui/storm into STORM-1221-V3

Posted by sr...@apache.org.
Merge branch 'STORM-1221' of https://github.com/haohui/storm into STORM-1221-V3


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee867bdd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee867bdd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee867bdd

Branch: refs/heads/master
Commit: ee867bdd9a811e121a7b15d7d72e3d3e35435289
Parents: 7fb80a1 20374ce
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 20 15:00:41 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 20 15:00:41 2015 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/trident/TridentTopology.java  | 17 ++++++++++++-
 .../jvm/storm/trident/spout/IBatchSpout.java    |  2 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |  3 ++-
 .../trident/spout/IPartitionedTridentSpout.java |  2 +-
 .../storm/trident/spout/ITridentDataSource.java | 26 ++++++++++++++++++++
 .../jvm/storm/trident/spout/ITridentSpout.java  |  2 +-
 6 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1221 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1221 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a8d253a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a8d253a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a8d253a9

Branch: refs/heads/master
Commit: a8d253a9b725b8fb3181efbfcac78d06e186bea9
Parents: ee867bd
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri Nov 20 15:01:10 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri Nov 20 15:01:10 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a8d253a9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7cd2419..a8beeb0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1221: Create a common interface for all Trident spout.
  * STORM-1198: Web UI to show resource usages and Total Resources on all supervisors
  * STORM-1167: Add windowing support for storm core.
  * STORM-1215: Use Async Loggers to avoid locking  and logging overhead