You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:24 UTC
[02/11] storm git commit: STORM-676 Upmerged and resolved conflicts
STORM-676 Upmerged and resolved conflicts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87a2d923
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87a2d923
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87a2d923
Branch: refs/heads/master
Commit: 87a2d92377413bca47258abb8e4ddab8fad9e8e2
Parents: a91abdb
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 12:20:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 12:20:21 2016 +0530
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 44 ++-
.../TridentHBaseWindowingStoreTopology.java | 124 +++++++++
.../TridentWindowingInmemoryStoreTopology.java | 134 +++++++++
.../trident/windowing/HBaseWindowsStore.java | 275 +++++++++++++++++++
.../windowing/HBaseWindowsStoreFactory.java | 51 ++++
storm-core/src/jvm/org/apache/storm/Config.java | 8 +
.../jvm/org/apache/storm/trident/Stream.java | 223 +++++++++++++--
.../apache/storm/trident/TridentTopology.java | 4 +
.../storm/trident/fluent/UniqueIdGen.java | 14 +-
.../storm/trident/operation/builtin/Debug.java | 4 +-
.../windowing/AbstractTridentWindowManager.java | 241 ++++++++++++++++
.../windowing/ITridentWindowManager.java | 59 ++++
.../windowing/InMemoryTridentWindowManager.java | 78 ++++++
.../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
.../windowing/InMemoryWindowsStoreFactory.java | 37 +++
.../StoreBasedTridentWindowManager.java | 223 +++++++++++++++
.../trident/windowing/TridentBatchTuple.java | 42 +++
.../windowing/WindowTridentProcessor.java | 260 ++++++++++++++++++
.../storm/trident/windowing/WindowsState.java | 52 ++++
.../trident/windowing/WindowsStateFactory.java | 40 +++
.../trident/windowing/WindowsStateUpdater.java | 81 ++++++
.../storm/trident/windowing/WindowsStore.java | 78 ++++++
.../trident/windowing/WindowsStoreFactory.java | 35 +++
.../windowing/config/BaseWindowConfig.java | 48 ++++
.../windowing/config/SlidingCountWindow.java | 40 +++
.../windowing/config/SlidingDurationWindow.java | 42 +++
.../windowing/config/TumblingCountWindow.java | 39 +++
.../config/TumblingDurationWindow.java | 40 +++
.../trident/windowing/config/WindowConfig.java | 55 ++++
.../windowing/strategy/BaseWindowStrategy.java | 32 +++
.../strategy/SlidingCountWindowStrategy.java | 59 ++++
.../strategy/SlidingDurationWindowStrategy.java | 60 ++++
.../strategy/TumblingCountWindowStrategy.java | 60 ++++
.../TumblingDurationWindowStrategy.java | 60 ++++
.../windowing/strategy/WindowStrategy.java | 45 +++
.../strategy/WindowStrategyFactory.java | 60 ++++
.../apache/storm/windowing/TriggerHandler.java | 2 +-
.../storm/trident/TridentWindowingTest.java | 110 ++++++++
38 files changed, 3019 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 929c8ea..638d07a 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -31,10 +31,13 @@
<name>storm-starter</name>
<properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <!-- see comment below... This fixes an annoyance with intellij -->
- <provided.scope>provided</provided.scope>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- see comment below... This fixes an annoyance with intellij -->
+ <provided.scope>provided</provided.scope>
+ <hbase.version>0.98.4-hadoop2</hbase.version>
+ <hbase.version>1.1.2</hbase.version>
</properties>
+
<profiles>
<profile>
<id>intellij</id>
@@ -149,6 +152,11 @@
<artifactId>storm-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
@@ -177,6 +185,36 @@
<artifactId>storm-redis</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
new file mode 100644
index 0000000..24cc8d9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+ private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+ public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+
+ Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word"))
+ .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+// .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+ .each(new Fields("count"), new Debug())
+ .each(new Fields("count"), new Echo(), new Fields("ct"));
+
+ return topology.build();
+ }
+
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+
+ static class Echo implements Function {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ log.info("##########Echo.execute: " + tuple);
+ collector.emit(tuple.getValues());
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+
+ // window-state table should already be created with cf:tuples column
+ HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ String topologyName = "wordCounterWithWindowing";
+ cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
+ Utils.sleep(120 * 1000);
+ cluster.killTopology(topologyName);
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
new file mode 100644
index 0000000..5f0cb4f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingInmemoryStoreTopology {
+ private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+ public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+
+ Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+ new Split(), new Fields("word"))
+ .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+// .aggregate(new CountAsAggregator(), new Fields("count"))
+ .each(new Fields("count"), new Debug())
+ .each(new Fields("count"), new Echo(), new Fields("ct"))
+ .each(new Fields("ct"), new Debug());
+
+ return topology.build();
+ }
+
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+
+ static class Echo implements Function {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ log.info("##########Echo.execute: " + tuple);
+ collector.emit(tuple.getValues());
+ }
+
+ @Override
+ public void prepare(Map conf, TridentOperationContext context) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
+
+ if (args.length == 0) {
+ List<? extends WindowConfig> list = Arrays.asList(
+ SlidingCountWindow.of(1000, 100)
+ ,TumblingCountWindow.of(1000)
+ ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+ ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+ );
+
+ for (WindowConfig windowConfig : list) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
+ Utils.sleep(60 * 1000);
+ cluster.shutdown();
+ }
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
new file mode 100644
index 0000000..47879a4
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+ private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+ public static final String UTF_8 = "utf-8";
+
+ private final ThreadLocal<HTable> threadLocalHtable;
+ private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+ private final byte[] family;
+ private final byte[] qualifier;
+
+ public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+ this.family = family;
+ this.qualifier = qualifier;
+
+ threadLocalHtable = new ThreadLocal<HTable>() {
+ @Override
+ protected HTable initialValue() {
+ try {
+ HTable hTable = new HTable(config, tableName);
+ htables.add(hTable);
+ return hTable;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ }
+
+ private HTable htable() {
+ return threadLocalHtable.get();
+ }
+
+ private byte[] effectiveKey(String key) {
+ try {
+ return key.getBytes(UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object get(String key) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ byte[] effectiveKey = effectiveKey(key);
+ Get get = new Get(effectiveKey);
+ Result result = null;
+ try {
+ result = htable().get(get);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if(result.isEmpty()) {
+ return null;
+ }
+
+ Kryo kryo = new Kryo();
+ Input input = new Input(result.getValue(family, qualifier));
+ Object resultObject = kryo.readClassAndObject(input);
+ return resultObject;
+
+ }
+
+ @Override
+ public Iterable<Object> get(List<String> keys) {
+ List<Get> gets = new ArrayList<>();
+ for (String key : keys) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ byte[] effectiveKey = effectiveKey(key);
+ gets.add(new Get(effectiveKey));
+ }
+
+ Result[] results = null;
+ try {
+ results = htable().get(gets);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Kryo kryo = new Kryo();
+ List<Object> values = new ArrayList<>();
+ for (int i=0; i<results.length; i++) {
+ Result result = results[i];
+ if(result.isEmpty()) {
+ log.error("Got empty result for key [{}]", keys.get(i));
+ throw new RuntimeException("Received empty result for key: "+keys.get(i));
+ }
+ Input input = new Input(result.getValue(family, qualifier));
+ Object resultObject = kryo.readClassAndObject(input);
+ values.add(resultObject);
+ }
+
+ return values;
+ }
+
+ @Override
+ public Iterable<String> getAllKeys() {
+ Scan scan = new Scan();
+ // this filter makes sure to receive only Key or row but not values associated with those rows.
+ scan.setFilter(new FirstKeyOnlyFilter());
+ //scan.setCaching(1000);
+
+ final Iterator<Result> resultIterator;
+ try {
+ resultIterator = htable().getScanner(scan).iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ final Iterator<String> iterator = new Iterator<String>() {
+ @Override
+ public boolean hasNext() {
+ return resultIterator.hasNext();
+ }
+
+ @Override
+ public String next() {
+ Result result = resultIterator.next();
+ String key = null;
+ try {
+ key = new String(result.getRow(), UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ return key;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove operation is not supported");
+ }
+ };
+
+ return new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return iterator;
+ }
+ };
+ }
+
+ @Override
+ public void put(String key, Object value) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+ WindowsStore.Entry.nonNullCheckForValue(value);
+
+ if(value == null) {
+ throw new IllegalArgumentException("Invalid value of null with key: "+key);
+ }
+ Put put = new Put(effectiveKey(key));
+ Kryo kryo = new Kryo();
+ Output output = new Output(new ByteArrayOutputStream());
+ kryo.writeClassAndObject(output, value);
+ put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+ try {
+ htable().put(put);
+ } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void putAll(Collection<Entry> entries) {
+ List<Put> list = new ArrayList<>();
+ for (Entry entry : entries) {
+ Put put = new Put(effectiveKey(entry.key));
+ Output output = new Output(new ByteArrayOutputStream());
+ Kryo kryo = new Kryo();
+ kryo.writeClassAndObject(output, entry.value);
+ put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+ list.add(put);
+ }
+
+ try {
+ htable().put(list);
+ } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove(String key) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+ try {
+ htable().delete(delete);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void removeAll(Collection<String> keys) {
+ List<Delete> deleteBatch = new ArrayList<>();
+ for (String key : keys) {
+ WindowsStore.Entry.nonNullCheckForKey(key);
+
+ Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+ deleteBatch.add(delete);
+ }
+ try {
+ htable().delete(deleteBatch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // close all the created hTable instances
+ for (HTable htable : htables) {
+ try {
+ htable.close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
new file mode 100644
index 0000000..56fad58
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+ private final Map<String, Object> config;
+ private final String tableName;
+ private final byte[] family;
+ private final byte[] qualifier;
+
+ public HBaseWindowsStoreFactory(Map<String, Object> config, String tableName, byte[] family, byte[] qualifier) {
+ this.config = config;
+ this.tableName = tableName;
+ this.family = family;
+ this.qualifier = qualifier;
+ }
+
+ public WindowsStore create() {
+ Configuration configuration = HBaseConfiguration.create();
+ for (Map.Entry<String, Object> entry : config.entrySet()) {
+ if (entry.getValue() != null) {
+ configuration.set(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ return new HBaseWindowsStore(configuration, tableName, family, qualifier);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..62edc2b 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1971,6 +1971,14 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
/**
+ * Maximum number of tuples that can be stored inmemory cache in windowing operators for fast access without fetching
+ * them from store.
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT="topology.trident.windowing.cache.tuple.limit";
+
+ /**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
@isString
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 4a51b56..b3a4446 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.topology.ResourceDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.trident.fluent.GlobalAggregationScheme;
import org.apache.storm.trident.fluent.GroupedStream;
@@ -68,10 +69,22 @@ import org.apache.storm.trident.state.StateSpec;
import org.apache.storm.trident.state.StateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowTridentProcessor;
+import org.apache.storm.trident.windowing.WindowsStateFactory;
+import org.apache.storm.trident.windowing.WindowsStateUpdater;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
/**
* A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -92,10 +105,10 @@ import java.util.Comparator;
*/
// TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
- Node _node;
- TridentTopology _topology;
- String _name;
-
+ final Node _node;
+ final String _name;
+ private final TridentTopology _topology;
+
protected Stream(TridentTopology topology, String name, Node node) {
_topology = topology;
_node = node;
@@ -180,7 +193,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
*/
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
- return new GroupedStream(this, fields);
+ return new GroupedStream(this, fields);
}
/**
@@ -287,7 +300,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
if(_node instanceof PartitionNode) {
return each(new Fields(), new TrueFilter()).partition(grouping);
} else {
- return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
+ return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
}
}
@@ -323,7 +336,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
functionFields,
new AggregateProcessor(inputFields, agg)));
}
-
+
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
projectionValidation(inputFields);
String stateId = state._node.stateInfo.id;
@@ -335,11 +348,11 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
_topology._colocate.get(stateId).add(n);
return _topology.addSourcedNode(this, n);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields) {
return partitionPersist(new StateSpec(stateFactory), inputFields, updater, functionFields);
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
projectionValidation(inputFields);
String id = _topology.getUniqueStateId();
@@ -352,19 +365,19 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
n.stateInfo = new NodeStateInfo(id, stateSpec);
return _topology.addSourcedStateNode(this, n);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater) {
return partitionPersist(stateFactory, inputFields, updater, new Fields());
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater) {
- return partitionPersist(stateSpec, inputFields, updater, new Fields());
+ return partitionPersist(stateSpec, inputFields, updater, new Fields());
}
-
+
public Stream each(Function function, Fields functionFields) {
return each(null, function, functionFields);
}
-
+
public Stream each(Fields inputFields, Filter filter) {
return each(inputFields, new FilterExecutor(filter), new Fields());
}
@@ -448,7 +461,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public ChainedAggregatorDeclarer chainedAgg() {
return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
}
-
+
public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
@@ -462,8 +475,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
- }
-
+ }
+
public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
@@ -565,7 +578,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
-
+
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
@@ -594,19 +607,169 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
-
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration configuration
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slideDuration represents sliding duration configuration
+ * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+ public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+ WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+ }
+
+ /**
+ * Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store.
+ *
+ * @param windowConfig window configuration like window length and slide length.
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
+ */
+ public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+ // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
+ // as false int he below call.
+ InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory();
+ return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false);
+ }
+
+ /**
+ * Returns stream of aggregated results based on the given window configuration.
+ *
+ * @param windowConfig window configuration like window length and slide length.
+ * @param windowStoreFactory intermediary tuple store for storing tuples for windowing
+ * @param inputFields input fields
+ * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ * @return
+ */
+ public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
+ Aggregator aggregator, Fields functionFields) {
+ return window(windowConfig, windowStoreFactory, inputFields, aggregator, functionFields, true);
+ }
+
+ private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator,
+ Fields functionFields, boolean storeTuplesInStore) {
+ projectionValidation(inputFields);
+ windowConfig.validate();
+
+ Fields fields = addTriggerField(functionFields);
+
+ // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and
+ // that store is passed to WindowStateUpdater to remove them after committing the batch.
+ Stream stream = _topology.addSourcedNode(this,
+ new ProcessorNode(_topology.getUniqueStreamId(),
+ _name,
+ fields,
+ fields,
+ new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory,
+ inputFields, aggregator, storeTuplesInStore)));
+
+ Stream effectiveStream = stream.project(functionFields);
+
+ // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store
+ // when they are successfully processed.
+ StateFactory stateFactory = new WindowsStateFactory();
+ StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory);
+ stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
+
+ return effectiveStream;
+ }
+
+ private Fields addTriggerField(Fields functionFields) {
+ List<String> fieldsList = new ArrayList<>();
+ fieldsList.add(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+ for (String field : functionFields) {
+ fieldsList.add(field);
+ }
+ return new Fields(fieldsList);
+ }
+
public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater, Fields functionFields) {
return partitionPersist(new StateSpec(stateFactory), updater, functionFields);
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater, Fields functionFields) {
return partitionPersist(stateSpec, null, updater, functionFields);
}
-
+
public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater) {
return partitionPersist(stateFactory, updater, new Fields());
}
-
+
public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater) {
return partitionPersist(stateSpec, updater, new Fields());
}
@@ -622,7 +785,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
-
+
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
// replaces normal aggregation here with a global grouping because it needs to be consistent across batches
@@ -648,7 +811,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
projectionValidation(inputFields);
return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
}
-
+
public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
return stateQuery(state, null, function, functionFields);
}
@@ -662,7 +825,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public Fields getOutputFields() {
return _node.allOutputFields;
}
-
+
static class BatchGlobalAggScheme implements GlobalAggregationScheme<Stream> {
@Override
@@ -674,9 +837,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public BatchToPartition singleEmitPartitioner() {
return new IndexHashBatchToPartition();
}
-
+
}
-
+
static class GlobalAggScheme implements GlobalAggregationScheme<Stream> {
@Override
@@ -688,7 +851,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
public BatchToPartition singleEmitPartitioner() {
return new GlobalBatchToPartition();
}
-
+
}
private void projectionValidation(Fields projFields) {
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e0a349b..06e1576 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -829,6 +829,10 @@ public class TridentTopology {
protected String getUniqueStateId() {
return _gen.getUniqueStateId();
}
+
+ protected String getUniqueWindowId() {
+ return _gen.getUniqueWindowId();
+ }
protected void registerNode(Node n) {
_graph.addVertex(n);
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
index 1364faf..e063142 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
@@ -18,17 +18,21 @@
package org.apache.storm.trident.fluent;
public class UniqueIdGen {
- int _streamCounter = 0;
-
+ private int _streamCounter = 0;
+ private int _stateCounter = 0;
+ private int windowCounter = 0;
+
public String getUniqueStreamId() {
_streamCounter++;
return "s" + _streamCounter;
}
- int _stateCounter = 0;
-
public String getUniqueStateId() {
_stateCounter++;
return "state" + _stateCounter;
- }
+ }
+
+ public String getUniqueWindowId() {
+ return "w"+ (++windowCounter);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 07c5ae4..87c4167 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,8 @@ package org.apache.storm.trident.operation.builtin;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
+import java.util.Date;
+
/**
* Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
*/
@@ -40,7 +42,7 @@ public class Debug extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
- System.out.println(name + tuple.toString());
+ System.out.println("<"+new Date()+"> "+name + tuple.toString());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
new file mode 100644
index 0000000..2941e28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
+ private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+ protected final WindowManager<T> windowManager;
+ protected final Aggregator aggregator;
+ protected final BatchOutputCollector delegateCollector;
+ protected final String windowTaskId;
+ protected final WindowsStore windowStore;
+
+ protected final Set<String> activeBatches = new HashSet<>();
+ protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
+ protected final AtomicInteger triggerId = new AtomicInteger();
+ private final String windowTriggerCountId;
+ private final TriggerPolicy<T> triggerPolicy;
+
+ public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
+ Aggregator aggregator, BatchOutputCollector delegateCollector) {
+ this.windowTaskId = windowTaskId;
+ this.windowStore = windowStore;
+ this.aggregator = aggregator;
+ this.delegateCollector = delegateCollector;
+
+ windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
+
+ windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
+
+ WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+ EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
+ windowManager.setEvictionPolicy(evictionPolicy);
+ triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
+ windowManager.setTriggerPolicy(triggerPolicy);
+ }
+
+ @Override
+ public void prepare() {
+ preInitialize();
+
+ initialize();
+
+ postInitialize();
+ }
+
+ private void preInitialize() {
+ log.debug("Getting current trigger count for this component/task");
+ // get trigger count value from store
+ Object result = windowStore.get(windowTriggerCountId);
+ Integer currentCount = 0;
+ if(result == null) {
+ log.info("No current trigger count in windows store.");
+ } else {
+ currentCount = (Integer) result + 1;
+ }
+ windowStore.put(windowTriggerCountId, currentCount);
+ triggerId.set(currentCount);
+ }
+
+ private void postInitialize() {
+ // start trigger once the initialization is done.
+ triggerPolicy.start();
+ }
+
+ /**
+ * Load and initialize any resources into window manager before windowing for component/task is activated.
+ */
+ protected abstract void initialize();
+
+ /**
+ * Listener to reeive any activation/expiry of windowing events and take further action on them.
+ */
+ class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
+
+ @Override
+ public void onExpiry(List<T> expiredEvents) {
+ log.debug("onExpiry is invoked");
+ onTuplesExpired(expiredEvents);
+ }
+
+ @Override
+ public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
+ log.debug("onActivation is invoked with events size: {}", events.size());
+ // trigger occurred, create an aggregation and keep them in store
+ int currentTriggerId = triggerId.incrementAndGet();
+ execAggregatorAndStoreResult(currentTriggerId, events);
+ }
+ }
+
+ /**
+ * Handle expired tuple events which can be removing from cache or store.
+ *
+ * @param expiredEvents
+ */
+ protected abstract void onTuplesExpired(List<T> expiredEvents);
+
+ private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
+ List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
+
+ // run aggregator to compute the result
+ AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
+ Object state = aggregator.init(currentTriggerId, collector);
+ for (TridentTuple resultTuple : resultTuples) {
+ aggregator.aggregate(state, resultTuple, collector);
+ }
+ aggregator.complete(state, collector);
+
+ List<List<Object>> resultantAggregatedValue = collector.values;
+
+ ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
+ new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
+ windowStore.putAll(entries);
+
+ pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
+ }
+
+ /**
+ * Return {@code TridentTuple}s from given {@code tupleEvents}.
+ * @param tupleEvents
+ * @return
+ */
+ protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents);
+
+ /**
+ * This {@code TridentCollector} accumulates all the values emitted.
+ */
+ static class AccumulatedTuplesCollector implements TridentCollector {
+
+ final List<List<Object>> values = new ArrayList<>();
+ private final BatchOutputCollector delegateCollector;
+
+ public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) {
+ this.delegateCollector = delegateCollector;
+ }
+
+ @Override
+ public void emit(List<Object> values) {
+ this.values.add(values);
+ }
+
+ @Override
+ public void reportError(Throwable t) {
+ delegateCollector.reportError(t);
+ }
+
+ }
+
+ static class TriggerResult {
+ final int id;
+ final List<List<Object>> result;
+
+ public TriggerResult(int id, List<List<Object>> result) {
+ this.id = id;
+ this.result = result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TriggerResult)) return false;
+
+ TriggerResult that = (TriggerResult) o;
+
+ return id == that.id;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return "TriggerResult{" +
+ "id=" + id +
+ ", result=" + result +
+ '}';
+ }
+ }
+
+ public Queue<TriggerResult> getPendingTriggers() {
+ return pendingTriggers;
+ }
+
+ public void shutdown() {
+ try {
+ log.info("window manager [{}] is being shutdown", windowManager);
+ windowManager.shutdown();
+ } finally {
+ log.info("window store [{}] is being shutdown", windowStore);
+ windowStore.shutdown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
new file mode 100644
index 0000000..d4aef79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Window manager to handle trident tuple events.
+ */
+public interface ITridentWindowManager {
+
+ /**
+ * This is invoked from {@code org.apache.storm.trident.planner.TridentProcessor}'s prepare method. So any
+ * initialization tasks can be done before the topology starts accepting tuples. For ex:
+ * initialize window manager with any earlier stored tuples/triggers and start WindowManager
+ */
+ public void prepare();
+
+ /**
+ * This is invoked when from {@code org.apache.storm.trident.planner.TridentProcessor}'s cleanup method. So, any
+ * cleanup operations like clearing cache or close store connection etc can be done.
+ */
+ public void shutdown();
+
+ /**
+ * Add received batch of tuples to cache/store and add them to {@code WindowManager}
+ *
+ * @param batchId
+ * @param tuples
+ */
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples);
+
+ /**
+ * Returns pending triggers to be emitted.
+ *
+ * @return
+ */
+ public Queue<StoreBasedTridentWindowManager.TriggerResult> getPendingTriggers();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
new file mode 100644
index 0000000..cbb30af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
+ */
+public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
+ private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+
+ public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+ BatchOutputCollector delegateCollector) {
+ super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+ }
+
+ @Override
+ protected void initialize() {
+ log.debug("noop in initialize");
+ }
+
+ @Override
+ public List<TridentTuple> getTridentTuples(List<TridentTuple> tridentBatchTuples) {
+ return tridentBatchTuples;
+ }
+
+ @Override
+ public void onTuplesExpired(List<TridentTuple> expiredTuples) {
+ log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+ }
+
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+ // check if they are already added then ignore these tuples. This batch is replayed.
+ if (activeBatches.contains(getBatchTxnId(batchId))) {
+ log.info("Ignoring already added tuples with batch: %s", batchId);
+ return;
+ }
+
+ log.debug("Adding tuples to window-manager for batch: ", batchId);
+ for (TridentTuple tridentTuple : tuples) {
+ windowManager.add(tridentTuple);
+ }
+ }
+
+ public String getBatchTxnId(Object batchId) {
+ if (!(batchId instanceof IBatchID)) {
+ throw new IllegalArgumentException("argument should be an IBatchId instance");
+ }
+ return ((IBatchID) batchId).getId().toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
new file mode 100644
index 0000000..02d78e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Inmemory store implementation of {@code WindowsStore} which can be backed by persistent store.
+ *
+ */
+public class InMemoryWindowsStore implements WindowsStore, Serializable {
+
+ private final ConcurrentHashMap<String, Object> store = new ConcurrentHashMap<>();
+
+ private int maxSize;
+ private AtomicInteger currentSize;
+ private WindowsStore backingStore;
+
+ public InMemoryWindowsStore() {
+ }
+
+ /**
+ *
+ * @param maxSize maximum size of inmemory store
+ * @param backingStore backing store containing the entries
+ */
+ public InMemoryWindowsStore(int maxSize, WindowsStore backingStore) {
+ this.maxSize = maxSize;
+ currentSize = new AtomicInteger();
+ this.backingStore = backingStore;
+ }
+
+ @Override
+ public Object get(String key) {
+ Object value = store.get(key);
+
+ if(value == null && backingStore != null) {
+ value = backingStore.get(key);
+ }
+
+ return value;
+ }
+
+ @Override
+ public Iterable<Object> get(List<String> keys) {
+ List<Object> values = new ArrayList<>();
+ for (String key : keys) {
+ values.add(get(key));
+ }
+ return values;
+ }
+
+ @Override
+ public Iterable<String> getAllKeys() {
+ if(backingStore != null) {
+ return backingStore.getAllKeys();
+ }
+
+ final Enumeration<String> storeEnumeration = store.keys();
+ final Iterator<String> resultIterator = new Iterator<String>() {
+ @Override
+ public boolean hasNext() {
+ return storeEnumeration.hasMoreElements();
+ }
+
+ @Override
+ public String next() {
+ return storeEnumeration.nextElement();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove operation is not supported as it is immutable.");
+ }
+ };
+
+ return new Iterable<String>() {
+ @Override
+ public Iterator<String> iterator() {
+ return resultIterator;
+ }
+ };
+ }
+
+ @Override
+ public void put(String key, Object value) {
+ _put(key, value);
+
+ if(backingStore != null) {
+ backingStore.put(key, value);
+ }
+ }
+
+ private void _put(String key, Object value) {
+ if(!canAdd()) {
+ return;
+ }
+
+ store.put(key, value);
+ incrementCurrentSize();
+ }
+
+ private void incrementCurrentSize() {
+ if(backingStore != null) {
+ currentSize.incrementAndGet();
+ }
+ }
+
+ private boolean canAdd() {
+ return backingStore == null || currentSize.get() < maxSize;
+ }
+
+ @Override
+ public void putAll(Collection<Entry> entries) {
+ for (Entry entry : entries) {
+ _put(entry.key, entry.value);
+ }
+ if(backingStore != null) {
+ backingStore.putAll(entries);
+ }
+ }
+
+ @Override
+ public void remove(String key) {
+ _remove(key);
+
+ if(backingStore != null) {
+ backingStore.remove(key);
+ }
+ }
+
+ private void _remove(String key) {
+ Object oldValue = store.remove(key);
+
+ if(oldValue != null) {
+ decrementSize();
+ if(backingStore != null) {
+ backingStore.remove(key);
+ }
+ }
+
+ }
+
+ private void decrementSize() {
+ if(backingStore != null) {
+ currentSize.decrementAndGet();
+ }
+ }
+
+ @Override
+ public void removeAll(Collection<String> keys) {
+ for (String key : keys) {
+ _remove(key);
+ }
+
+ if(backingStore != null) {
+ backingStore.removeAll(keys);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ store.clear();
+
+ if(backingStore != null) {
+ backingStore.shutdown();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InMemoryWindowsStore{" +
+ " store:size = " + store.size() +
+ " backingStore = " + backingStore +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
new file mode 100644
index 0000000..32027a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
+
+ private InMemoryWindowsStore inMemoryWindowsStore;
+
+ @Override
+ public WindowsStore create() {
+ if(inMemoryWindowsStore == null) {
+ inMemoryWindowsStore = new InMemoryWindowsStore();
+ }
+ return inMemoryWindowsStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
new file mode 100644
index 0000000..885e508
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains
+ * tuples cache of {@code maxCachedTuplesSize} without accessing store for getting them.
+ *
+ */
+public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
+ private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+
+ private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
+
+ private final String windowTupleTaskId;
+ private final TridentTupleView.FreshOutputFactory freshOutputFactory;
+
+ private Long maxCachedTuplesSize;
+ private final Fields inputFields;
+ private AtomicLong currentCachedTuplesSize = new AtomicLong();
+
+ public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+ BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
+ super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+
+ this.maxCachedTuplesSize = maxTuplesCacheSize;
+ this.inputFields = inputFields;
+ freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
+ windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
+ }
+
+ protected void initialize() {
+
+ // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
+ String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId);
+ String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId);
+
+ List<String> attemptedTriggerKeys = new ArrayList<>();
+ List<String> triggerKeys = new ArrayList<>();
+
+ Iterable<String> allEntriesIterable = windowStore.getAllKeys();
+ for (String key : allEntriesIterable) {
+ if (key.startsWith(windowTupleTaskId)) {
+ int tupleIndexValue = lastPart(key);
+ String batchId = secondLastPart(key);
+ log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+ windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
+ } else if (key.startsWith(windowTriggerTaskId)) {
+ triggerKeys.add(key);
+ log.debug("Received trigger with key [{}]", key);
+ } else if(key.startsWith(windowTriggerInprocessId)) {
+ attemptedTriggerKeys.add(key);
+ log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+ }
+ }
+
+ // these triggers will be retried as part of batch retries
+ Set<Integer> triggersToBeIgnored = new HashSet<>();
+ Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys);
+ for (Object attemptedTrigger : attemptedTriggers) {
+ triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger);
+ }
+
+ // get trigger values only if they have more than zero
+ Iterable<Object> triggerObjects = windowStore.get(triggerKeys);
+ int i=0;
+ for (Object triggerObject : triggerObjects) {
+ int id = lastPart(triggerKeys.get(i++));
+ if(!triggersToBeIgnored.contains(id)) {
+ log.info("Adding pending trigger value [{}]", triggerObject);
+ pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
+ }
+ }
+
+ }
+
+ private int lastPart(String key) {
+ int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+ return Integer.parseInt(key.substring(lastSepIndex+1));
+ }
+
+ private String secondLastPart(String key) {
+ int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("key "+key+" does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+ String trimKey = key.substring(0, lastSepIndex);
+ int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+ if (lastSepIndex < 0) {
+ throw new IllegalArgumentException("key "+key+" does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+ }
+
+ return key.substring(secondLastSepIndex+1, lastSepIndex);
+ }
+
+ public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+ // check if they are already added then ignore these tuples. This batch is replayed.
+ if (activeBatches.contains(getBatchTxnId(batchId))) {
+ log.info("Ignoring already added tuples with batch: %s", batchId);
+ return;
+ }
+
+ log.debug("Adding tuples to window-manager for batch: ", batchId);
+ List<WindowsStore.Entry> entries = new ArrayList<>();
+ for (int i = 0; i < tuples.size(); i++) {
+ String key = keyOf(batchId);
+ TridentTuple tridentTuple = tuples.get(i);
+ entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
+ }
+
+ // tuples should be available in store before they are added to window manager
+ windowStore.putAll(entries);
+
+ for (int i = 0; i < tuples.size(); i++) {
+ String key = keyOf(batchId);
+ TridentTuple tridentTuple = tuples.get(i);
+ addToWindowManager(i, key, tridentTuple);
+ }
+
+ }
+
+ private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
+ TridentTuple actualTuple = null;
+ if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
+ actualTuple = tridentTuple;
+ }
+ currentCachedTuplesSize.incrementAndGet();
+ windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
+ }
+
+ public String getBatchTxnId(Object batchId) {
+ if (!(batchId instanceof IBatchID)) {
+ throw new IllegalArgumentException("argument should be an IBatchId instance");
+ }
+ return ((IBatchID) batchId).getId().toString();
+ }
+
+ public String keyOf(Object batchId) {
+ return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
+ }
+
+ public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
+ List<TridentTuple> resultTuples = new ArrayList<>();
+ List<String> keys = new ArrayList<>();
+ for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) {
+ TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys);
+ if(tuple != null) {
+ resultTuples.add(tuple);
+ }
+ }
+
+ if(keys.size() > 0) {
+ Iterable<Object> storedTupleValues = windowStore.get(keys);
+ for (Object storedTupleValue : storedTupleValues) {
+ TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue);
+ resultTuples.add(tridentTuple);
+ }
+ }
+
+ return resultTuples;
+ }
+
+ public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) {
+ if (tridentBatchTuple.tridentTuple != null) {
+ return tridentBatchTuple.tridentTuple;
+ }
+ keys.add(tupleKey(tridentBatchTuple));
+ return null;
+ }
+
+ public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
+ if (maxCachedTuplesSize != null) {
+ currentCachedTuplesSize.addAndGet(-expiredTuples.size());
+ }
+
+ List<String> keys = new ArrayList<>();
+ for (TridentBatchTuple expiredTuple : expiredTuples) {
+ keys.add(tupleKey(expiredTuple));
+ }
+
+ windowStore.removeAll(keys);
+ }
+
+ private String tupleKey(TridentBatchTuple tridentBatchTuple) {
+ return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
+ }
+
+}