You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:24 UTC

[02/11] storm git commit: STORM-676 Upmerged and resolved conflicts

STORM-676 Upmerged and resolved conflicts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87a2d923
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87a2d923
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87a2d923

Branch: refs/heads/master
Commit: 87a2d92377413bca47258abb8e4ddab8fad9e8e2
Parents: a91abdb
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 12:20:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 12:20:21 2016 +0530

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  44 ++-
 .../TridentHBaseWindowingStoreTopology.java     | 124 +++++++++
 .../TridentWindowingInmemoryStoreTopology.java  | 134 +++++++++
 .../trident/windowing/HBaseWindowsStore.java    | 275 +++++++++++++++++++
 .../windowing/HBaseWindowsStoreFactory.java     |  51 ++++
 storm-core/src/jvm/org/apache/storm/Config.java |   8 +
 .../jvm/org/apache/storm/trident/Stream.java    | 223 +++++++++++++--
 .../apache/storm/trident/TridentTopology.java   |   4 +
 .../storm/trident/fluent/UniqueIdGen.java       |  14 +-
 .../storm/trident/operation/builtin/Debug.java  |   4 +-
 .../windowing/AbstractTridentWindowManager.java | 241 ++++++++++++++++
 .../windowing/ITridentWindowManager.java        |  59 ++++
 .../windowing/InMemoryTridentWindowManager.java |  78 ++++++
 .../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
 .../windowing/InMemoryWindowsStoreFactory.java  |  37 +++
 .../StoreBasedTridentWindowManager.java         | 223 +++++++++++++++
 .../trident/windowing/TridentBatchTuple.java    |  42 +++
 .../windowing/WindowTridentProcessor.java       | 260 ++++++++++++++++++
 .../storm/trident/windowing/WindowsState.java   |  52 ++++
 .../trident/windowing/WindowsStateFactory.java  |  40 +++
 .../trident/windowing/WindowsStateUpdater.java  |  81 ++++++
 .../storm/trident/windowing/WindowsStore.java   |  78 ++++++
 .../trident/windowing/WindowsStoreFactory.java  |  35 +++
 .../windowing/config/BaseWindowConfig.java      |  48 ++++
 .../windowing/config/SlidingCountWindow.java    |  40 +++
 .../windowing/config/SlidingDurationWindow.java |  42 +++
 .../windowing/config/TumblingCountWindow.java   |  39 +++
 .../config/TumblingDurationWindow.java          |  40 +++
 .../trident/windowing/config/WindowConfig.java  |  55 ++++
 .../windowing/strategy/BaseWindowStrategy.java  |  32 +++
 .../strategy/SlidingCountWindowStrategy.java    |  59 ++++
 .../strategy/SlidingDurationWindowStrategy.java |  60 ++++
 .../strategy/TumblingCountWindowStrategy.java   |  60 ++++
 .../TumblingDurationWindowStrategy.java         |  60 ++++
 .../windowing/strategy/WindowStrategy.java      |  45 +++
 .../strategy/WindowStrategyFactory.java         |  60 ++++
 .../apache/storm/windowing/TriggerHandler.java  |   2 +-
 .../storm/trident/TridentWindowingTest.java     | 110 ++++++++
 38 files changed, 3019 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 929c8ea..638d07a 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -31,10 +31,13 @@
   <name>storm-starter</name>
 
   <properties>
-     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-     <!-- see comment below... This fixes an annoyance with intellij -->
-     <provided.scope>provided</provided.scope>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <!-- see comment below... This fixes an annoyance with intellij -->
+    <provided.scope>provided</provided.scope>
+    <hbase.version>0.98.4-hadoop2</hbase.version>
+    <hbase.version>1.1.2</hbase.version>
   </properties>
+
   <profiles>
     <profile>
         <id>intellij</id>
@@ -149,6 +152,11 @@
       <artifactId>storm-hdfs</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>storm-hbase</artifactId>
+          <version>${project.version}</version>
+      </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
@@ -177,6 +185,36 @@
       <artifactId>storm-redis</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.apache.zookeeper</groupId>
+                  <artifactId>zookeeper</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.apache.zookeeper</groupId>
+                  <artifactId>zookeeper</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
new file mode 100644
index 0000000..24cc8d9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+    private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+    public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+
+        Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+                new Split(), new Fields("word"))
+                .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+//                .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+                .each(new Fields("count"), new Debug())
+                .each(new Fields("count"), new Echo(), new Fields("ct"));
+
+        return topology.build();
+    }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    static class Echo implements Function {
+
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            log.info("##########Echo.execute: " + tuple);
+            collector.emit(tuple.getValues());
+        }
+
+        @Override
+        public void prepare(Map conf, TridentOperationContext context) {
+
+        }
+
+        @Override
+        public void cleanup() {
+
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+
+        // window-state table should already be created with cf:tuples column
+        HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            String topologyName = "wordCounterWithWindowing";
+            cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
+            Utils.sleep(120 * 1000);
+            cluster.killTopology(topologyName);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
new file mode 100644
index 0000000..5f0cb4f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingInmemoryStoreTopology {
+    private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+    public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+
+        Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+                new Split(), new Fields("word"))
+                .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+//                .aggregate(new CountAsAggregator(), new Fields("count"))
+                .each(new Fields("count"), new Debug())
+                .each(new Fields("count"), new Echo(), new Fields("ct"))
+                .each(new Fields("ct"), new Debug());
+
+        return topology.build();
+    }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    static class Echo implements Function {
+
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            log.info("##########Echo.execute: " + tuple);
+            collector.emit(tuple.getValues());
+        }
+
+        @Override
+        public void prepare(Map conf, TridentOperationContext context) {
+
+        }
+
+        @Override
+        public void cleanup() {
+
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
+
+        if (args.length == 0) {
+            List<? extends WindowConfig> list = Arrays.asList(
+                    SlidingCountWindow.of(1000, 100)
+                    ,TumblingCountWindow.of(1000)
+                    ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+                    ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+            );
+
+            for (WindowConfig windowConfig : list) {
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
+                Utils.sleep(60 * 1000);
+                cluster.shutdown();
+            }
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
new file mode 100644
index 0000000..47879a4
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+    private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+    public static final String UTF_8 = "utf-8";
+
+    private final ThreadLocal<HTable> threadLocalHtable;
+    private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+    private final byte[] family;
+    private final byte[] qualifier;
+
+    public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+        this.family = family;
+        this.qualifier = qualifier;
+
+        threadLocalHtable = new ThreadLocal<HTable>() {
+            @Override
+            protected HTable initialValue() {
+                try {
+                    HTable hTable = new HTable(config, tableName);
+                    htables.add(hTable);
+                    return hTable;
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+    }
+
+    private HTable htable() {
+        return threadLocalHtable.get();
+    }
+
+    private byte[] effectiveKey(String key) {
+        try {
+            return key.getBytes(UTF_8);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Object get(String key) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+
+        byte[] effectiveKey = effectiveKey(key);
+        Get get = new Get(effectiveKey);
+        Result result = null;
+        try {
+            result = htable().get(get);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        if(result.isEmpty()) {
+            return null;
+        }
+
+        Kryo kryo = new Kryo();
+        Input input = new Input(result.getValue(family, qualifier));
+        Object resultObject = kryo.readClassAndObject(input);
+        return resultObject;
+
+    }
+
+    @Override
+    public Iterable<Object> get(List<String> keys) {
+        List<Get> gets = new ArrayList<>();
+        for (String key : keys) {
+            WindowsStore.Entry.nonNullCheckForKey(key);
+
+            byte[] effectiveKey = effectiveKey(key);
+            gets.add(new Get(effectiveKey));
+        }
+
+        Result[] results = null;
+        try {
+            results = htable().get(gets);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        Kryo kryo = new Kryo();
+        List<Object> values = new ArrayList<>();
+        for (int i=0; i<results.length; i++) {
+            Result result = results[i];
+            if(result.isEmpty()) {
+                log.error("Got empty result for key [{}]", keys.get(i));
+                throw new RuntimeException("Received empty result for key: "+keys.get(i));
+            }
+            Input input = new Input(result.getValue(family, qualifier));
+            Object resultObject = kryo.readClassAndObject(input);
+            values.add(resultObject);
+        }
+
+        return values;
+    }
+
+    @Override
+    public Iterable<String> getAllKeys() {
+        Scan scan = new Scan();
+        // this filter makes sure to receive only Key or row but not values associated with those rows.
+        scan.setFilter(new FirstKeyOnlyFilter());
+        //scan.setCaching(1000);
+
+        final Iterator<Result> resultIterator;
+        try {
+            resultIterator = htable().getScanner(scan).iterator();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        final Iterator<String> iterator = new Iterator<String>() {
+            @Override
+            public boolean hasNext() {
+                return resultIterator.hasNext();
+            }
+
+            @Override
+            public String next() {
+                Result result = resultIterator.next();
+                String key = null;
+                try {
+                    key = new String(result.getRow(), UTF_8);
+                } catch (UnsupportedEncodingException e) {
+                    throw new RuntimeException(e);
+                }
+                return key;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove operation is not supported");
+            }
+        };
+
+        return new Iterable<String>() {
+            @Override
+            public Iterator<String> iterator() {
+                return iterator;
+            }
+        };
+    }
+
+    @Override
+    public void put(String key, Object value) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+        WindowsStore.Entry.nonNullCheckForValue(value);
+
+        if(value == null) {
+            throw new IllegalArgumentException("Invalid value of null with key: "+key);
+        }
+        Put put = new Put(effectiveKey(key));
+        Kryo kryo = new Kryo();
+        Output output = new Output(new ByteArrayOutputStream());
+        kryo.writeClassAndObject(output, value);
+        put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+        try {
+            htable().put(put);
+        } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void putAll(Collection<Entry> entries) {
+        List<Put> list = new ArrayList<>();
+        for (Entry entry : entries) {
+            Put put = new Put(effectiveKey(entry.key));
+            Output output = new Output(new ByteArrayOutputStream());
+            Kryo kryo = new Kryo();
+            kryo.writeClassAndObject(output, entry.value);
+            put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+            list.add(put);
+        }
+
+        try {
+            htable().put(list);
+        } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void remove(String key) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+
+        Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+        try {
+            htable().delete(delete);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void removeAll(Collection<String> keys) {
+        List<Delete> deleteBatch = new ArrayList<>();
+        for (String key : keys) {
+            WindowsStore.Entry.nonNullCheckForKey(key);
+
+            Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+            deleteBatch.add(delete);
+        }
+        try {
+            htable().delete(deleteBatch);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        // close all the created hTable instances
+        for (HTable htable : htables) {
+            try {
+                htable.close();
+            } catch (IOException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
new file mode 100644
index 0000000..56fad58
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+    private final Map<String, Object> config;
+    private final String tableName;
+    private final byte[] family;
+    private final byte[] qualifier;
+
+    public HBaseWindowsStoreFactory(Map<String, Object> config, String tableName, byte[] family, byte[] qualifier) {
+        this.config = config;
+        this.tableName = tableName;
+        this.family = family;
+        this.qualifier = qualifier;
+    }
+
+    public WindowsStore create() {
+        Configuration configuration = HBaseConfiguration.create();
+        for (Map.Entry<String, Object> entry : config.entrySet()) {
+            if (entry.getValue() != null) {
+                configuration.set(entry.getKey(), entry.getValue().toString());
+            }
+        }
+        return new HBaseWindowsStore(configuration, tableName, family, qualifier);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..62edc2b 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1971,6 +1971,14 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
 
     /**
+     * Maximum number of tuples that can be stored inmemory cache in windowing operators for fast access without fetching
+     * them from store.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT="topology.trident.windowing.cache.tuple.limit";
+
+    /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
      */
     @isString

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 4a51b56..b3a4446 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.topology.ResourceDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
@@ -68,10 +69,22 @@ import org.apache.storm.trident.state.StateSpec;
 import org.apache.storm.trident.state.StateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowTridentProcessor;
+import org.apache.storm.trident.windowing.WindowsStateFactory;
+import org.apache.storm.trident.windowing.WindowsStateUpdater;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
 /**
  * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -92,10 +105,10 @@ import java.util.Comparator;
  */
 // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
 public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
-    Node _node;
-    TridentTopology _topology;
-    String _name;
-    
+    final Node _node;
+    final String _name;
+    private final TridentTopology _topology;
+
     protected Stream(TridentTopology topology, String name, Node node) {
         _topology = topology;
         _node = node;
@@ -180,7 +193,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      */
     public GroupedStream groupBy(Fields fields) {
         projectionValidation(fields);
-        return new GroupedStream(this, fields);        
+        return new GroupedStream(this, fields);
     }
 
     /**
@@ -287,7 +300,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         if(_node instanceof PartitionNode) {
             return each(new Fields(), new TrueFilter()).partition(grouping);
         } else {
-            return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));       
+            return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
         }
     }
 
@@ -323,7 +336,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
                     functionFields,
                     new AggregateProcessor(inputFields, agg)));
     }
-    
+
     public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
         projectionValidation(inputFields);
         String stateId = state._node.stateInfo.id;
@@ -335,11 +348,11 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         _topology._colocate.get(stateId).add(n);
         return _topology.addSourcedNode(this, n);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields) {
       return partitionPersist(new StateSpec(stateFactory), inputFields, updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
         projectionValidation(inputFields);
         String id = _topology.getUniqueStateId();
@@ -352,19 +365,19 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         n.stateInfo = new NodeStateInfo(id, stateSpec);
         return _topology.addSourcedStateNode(this, n);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater) {
       return partitionPersist(stateFactory, inputFields, updater, new Fields());
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater) {
-      return partitionPersist(stateSpec, inputFields, updater, new Fields());        
+      return partitionPersist(stateSpec, inputFields, updater, new Fields());
     }
-    
+
     public Stream each(Function function, Fields functionFields) {
         return each(null, function, functionFields);
     }
-    
+
     public Stream each(Fields inputFields, Filter filter) {
         return each(inputFields, new FilterExecutor(filter), new Fields());
     }
@@ -448,7 +461,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
     }
-    
+
     public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
         return partitionAggregate(null, agg, functionFields);
     }
@@ -462,8 +475,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         return chainedAgg()
                .partitionAggregate(inputFields, agg, functionFields)
                .chainEnd();
-    }  
-    
+    }
+
     public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
         return partitionAggregate(null, agg, functionFields);
     }
@@ -565,7 +578,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public Stream aggregate(Aggregator agg, Fields functionFields) {
         return aggregate(null, agg, functionFields);
     }
-    
+
     public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
         projectionValidation(inputFields);
         return chainedAgg()
@@ -594,19 +607,169 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
                 .aggregate(inputFields, agg, functionFields)
                 .chainEnd();
     }
-    
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+     *
+     * @param windowCount represents window tuples count
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+     *
+     * @param windowCount represents no of tuples in the window
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
+     * and slides the window with {@code slideCount}.
+     *
+     * @param windowCount represents tuples count of a window
+     * @param slideCount represents sliding count window
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+                                     Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+     *
+     * @param windowDuration represents tumbling window duration configuration
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+                                     Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+     * and completes a window at {@code windowDuration}
+     *
+     * @param windowDuration represents window duration configuration
+     * @param slideDuration represents sliding duration  configuration
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+                                    WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store.
+     *
+     * @param windowConfig window configuration like window length and slide length.
+     * @param inputFields input fields
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     * @return
+     */
+    public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
+        // as false int he below call.
+        InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory();
+        return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false);
+    }
+
+    /**
+     * Returns stream of aggregated results based on the given window configuration.
+     *
+     * @param windowConfig window configuration like window length and slide length.
+     * @param windowStoreFactory intermediary tuple store for storing tuples for windowing
+     * @param inputFields input fields
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     * @return
+     */
+    public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
+                         Aggregator aggregator, Fields functionFields) {
+        return window(windowConfig, windowStoreFactory, inputFields, aggregator, functionFields, true);
+    }
+
+    private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator,
+                          Fields functionFields, boolean storeTuplesInStore) {
+        projectionValidation(inputFields);
+        windowConfig.validate();
+
+        Fields fields = addTriggerField(functionFields);
+
+        // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and
+        // that store is passed to WindowStateUpdater to remove them after committing the batch.
+        Stream stream = _topology.addSourcedNode(this,
+                new ProcessorNode(_topology.getUniqueStreamId(),
+                        _name,
+                        fields,
+                        fields,
+                        new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory,
+                                inputFields, aggregator, storeTuplesInStore)));
+
+        Stream effectiveStream = stream.project(functionFields);
+
+        // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store
+        // when they are successfully processed.
+        StateFactory stateFactory = new WindowsStateFactory();
+        StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory);
+        stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
+
+        return effectiveStream;
+    }
+
+    private Fields addTriggerField(Fields functionFields) {
+        List<String> fieldsList = new ArrayList<>();
+        fieldsList.add(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+        for (String field : functionFields) {
+            fieldsList.add(field);
+        }
+        return new Fields(fieldsList);
+    }
+
     public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater, Fields functionFields) {
         return partitionPersist(new StateSpec(stateFactory), updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater, Fields functionFields) {
         return partitionPersist(stateSpec, null, updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater) {
         return partitionPersist(stateFactory, updater, new Fields());
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater) {
         return partitionPersist(stateSpec, updater, new Fields());
     }
@@ -622,7 +785,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
         return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
     }
-    
+
     public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
         projectionValidation(inputFields);
         // replaces normal aggregation here with a global grouping because it needs to be consistent across batches 
@@ -648,7 +811,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         projectionValidation(inputFields);
         return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
     }
-    
+
     public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
         return stateQuery(state, null, function, functionFields);
     }
@@ -662,7 +825,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public Fields getOutputFields() {
         return _node.allOutputFields;
     }
-    
+
     static class BatchGlobalAggScheme implements GlobalAggregationScheme<Stream> {
 
         @Override
@@ -674,9 +837,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         public BatchToPartition singleEmitPartitioner() {
             return new IndexHashBatchToPartition();
         }
-        
+
     }
-    
+
     static class GlobalAggScheme implements GlobalAggregationScheme<Stream> {
 
         @Override
@@ -688,7 +851,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         public BatchToPartition singleEmitPartitioner() {
             return new GlobalBatchToPartition();
         }
-        
+
     }
 
     private void projectionValidation(Fields projFields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e0a349b..06e1576 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -829,6 +829,10 @@ public class TridentTopology {
     protected String getUniqueStateId() {
         return _gen.getUniqueStateId();
     }
+
+    protected String getUniqueWindowId() {
+        return _gen.getUniqueWindowId();
+    }
     
     protected void registerNode(Node n) {
         _graph.addVertex(n);

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
index 1364faf..e063142 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
@@ -18,17 +18,21 @@
 package org.apache.storm.trident.fluent;
 
 public class UniqueIdGen {
-    int _streamCounter = 0;
-    
+    private int _streamCounter = 0;
+    private int _stateCounter = 0;
+    private int windowCounter = 0;
+
     public String getUniqueStreamId() {
         _streamCounter++;
         return "s" + _streamCounter;
     }
 
-    int _stateCounter = 0;
-    
     public String getUniqueStateId() {
         _stateCounter++;
         return "state" + _stateCounter;
-    }    
+    }
+
+    public String getUniqueWindowId() {
+        return "w"+ (++windowCounter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 07c5ae4..87c4167 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,8 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.Date;
+
 /**
  * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
  */
@@ -40,7 +42,7 @@ public class Debug extends BaseFilter {
 
     @Override
     public boolean isKeep(TridentTuple tuple) {
-        System.out.println(name + tuple.toString());
+        System.out.println("<"+new Date()+"> "+name + tuple.toString());
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
new file mode 100644
index 0000000..2941e28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
+    private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+    protected final WindowManager<T> windowManager;
+    protected final Aggregator aggregator;
+    protected final BatchOutputCollector delegateCollector;
+    protected final String windowTaskId;
+    protected final WindowsStore windowStore;
+
+    protected final Set<String> activeBatches = new HashSet<>();
+    protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
+    protected final AtomicInteger triggerId = new AtomicInteger();
+    private final String windowTriggerCountId;
+    private final TriggerPolicy<T> triggerPolicy;
+
+    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
+                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
+        this.windowTaskId = windowTaskId;
+        this.windowStore = windowStore;
+        this.aggregator = aggregator;
+        this.delegateCollector = delegateCollector;
+
+        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
+
+        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
+
+        WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
+        windowManager.setEvictionPolicy(evictionPolicy);
+        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
+        windowManager.setTriggerPolicy(triggerPolicy);
+    }
+
+    @Override
+    public void prepare() {
+        preInitialize();
+
+        initialize();
+
+        postInitialize();
+    }
+
+    private void preInitialize() {
+        log.debug("Getting current trigger count for this component/task");
+        // get trigger count value from store
+        Object result = windowStore.get(windowTriggerCountId);
+        Integer currentCount = 0;
+        if(result == null) {
+            log.info("No current trigger count in windows store.");
+        } else {
+            currentCount = (Integer) result + 1;
+        }
+        windowStore.put(windowTriggerCountId, currentCount);
+        triggerId.set(currentCount);
+    }
+
+    private void postInitialize() {
+        // start trigger once the initialization is done.
+        triggerPolicy.start();
+    }
+
+    /**
+     * Load and initialize any resources into window manager before windowing for component/task is activated.
+     */
+    protected abstract void initialize();
+
+    /**
+     * Listener to reeive any activation/expiry of windowing events and take further action on them.
+     */
+    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
+
+        @Override
+        public void onExpiry(List<T> expiredEvents) {
+            log.debug("onExpiry is invoked");
+            onTuplesExpired(expiredEvents);
+        }
+
+        @Override
+        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
+            log.debug("onActivation is invoked with events size: {}", events.size());
+            // trigger occurred, create an aggregation and keep them in store
+            int currentTriggerId = triggerId.incrementAndGet();
+            execAggregatorAndStoreResult(currentTriggerId, events);
+        }
+    }
+
+    /**
+     * Handle expired tuple events which can be removing from cache or store.
+     *
+     * @param expiredEvents
+     */
+    protected abstract void onTuplesExpired(List<T> expiredEvents);
+
+    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
+        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
+
+        // run aggregator to compute the result
+        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
+        Object state = aggregator.init(currentTriggerId, collector);
+        for (TridentTuple resultTuple : resultTuples) {
+            aggregator.aggregate(state, resultTuple, collector);
+        }
+        aggregator.complete(state, collector);
+
+        List<List<Object>> resultantAggregatedValue = collector.values;
+
+        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
+                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
+        windowStore.putAll(entries);
+
+        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
+    }
+
+    /**
+     * Return {@code TridentTuple}s from given {@code tupleEvents}.
+     * @param tupleEvents
+     * @return
+     */
+    protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents);
+
+    /**
+     * This {@code TridentCollector} accumulates all the values emitted.
+     */
+    static class AccumulatedTuplesCollector implements TridentCollector {
+
+        final List<List<Object>> values = new ArrayList<>();
+        private final BatchOutputCollector delegateCollector;
+
+        public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) {
+            this.delegateCollector = delegateCollector;
+        }
+
+        @Override
+        public void emit(List<Object> values) {
+            this.values.add(values);
+        }
+
+        @Override
+        public void reportError(Throwable t) {
+            delegateCollector.reportError(t);
+        }
+
+    }
+
+    static class TriggerResult {
+        final int id;
+        final List<List<Object>> result;
+
+        public TriggerResult(int id, List<List<Object>> result) {
+            this.id = id;
+            this.result = result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof TriggerResult)) return false;
+
+            TriggerResult that = (TriggerResult) o;
+
+            return id == that.id;
+
+        }
+
+        @Override
+        public int hashCode() {
+            return id;
+        }
+
+        @Override
+        public String toString() {
+            return "TriggerResult{" +
+                    "id=" + id +
+                    ", result=" + result +
+                    '}';
+        }
+    }
+
+    public Queue<TriggerResult> getPendingTriggers() {
+        return pendingTriggers;
+    }
+
+    public void shutdown() {
+        try {
+            log.info("window manager [{}] is being shutdown", windowManager);
+            windowManager.shutdown();
+        } finally {
+            log.info("window store [{}] is being shutdown", windowStore);
+            windowStore.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
new file mode 100644
index 0000000..d4aef79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Window manager to handle trident tuple events.
+ */
+public interface ITridentWindowManager {
+
+    /**
+     * This is invoked from {@code org.apache.storm.trident.planner.TridentProcessor}'s  prepare method. So any
+     * initialization tasks can be done before the topology starts accepting tuples. For ex:
+     * initialize window manager with any earlier stored tuples/triggers and start WindowManager
+     */
+    public void prepare();
+
+    /**
+     * This is invoked when from {@code org.apache.storm.trident.planner.TridentProcessor}'s  cleanup method. So, any
+     * cleanup operations like clearing cache or close store connection etc can be done.
+     */
+    public void shutdown();
+
+    /**
+     * Add received batch of tuples to cache/store and add them to {@code WindowManager}
+     *
+     * @param batchId
+     * @param tuples
+     */
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples);
+
+    /**
+     * Returns pending triggers to be emitted.
+     *
+     * @return
+     */
+    public Queue<StoreBasedTridentWindowManager.TriggerResult> getPendingTriggers();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
new file mode 100644
index 0000000..cbb30af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
+ */
+public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
+    private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+
+    public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+                                        BatchOutputCollector delegateCollector) {
+        super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+    }
+
+    @Override
+    protected void initialize() {
+        log.debug("noop in initialize");
+    }
+
+    @Override
+    public List<TridentTuple> getTridentTuples(List<TridentTuple> tridentBatchTuples) {
+        return tridentBatchTuples;
+    }
+
+    @Override
+    public void onTuplesExpired(List<TridentTuple> expiredTuples) {
+        log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+    }
+
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+        // check if they are already added then ignore these tuples. This batch is replayed.
+        if (activeBatches.contains(getBatchTxnId(batchId))) {
+            log.info("Ignoring already added tuples with batch: %s", batchId);
+            return;
+        }
+
+        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        for (TridentTuple tridentTuple : tuples) {
+            windowManager.add(tridentTuple);
+        }
+    }
+
+    public String getBatchTxnId(Object batchId) {
+        if (!(batchId instanceof IBatchID)) {
+            throw new IllegalArgumentException("argument should be an IBatchId instance");
+        }
+        return ((IBatchID) batchId).getId().toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
new file mode 100644
index 0000000..02d78e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Inmemory store implementation of {@code WindowsStore} which can be backed by persistent store.
+ *
+ */
+public class InMemoryWindowsStore implements WindowsStore, Serializable {
+
+    private final ConcurrentHashMap<String, Object> store = new ConcurrentHashMap<>();
+
+    private int maxSize;
+    private AtomicInteger currentSize;
+    private WindowsStore backingStore;
+
+    public InMemoryWindowsStore() {
+    }
+
+    /**
+     *
+     * @param maxSize maximum size of inmemory store
+     * @param backingStore backing store containing the entries
+     */
+    public InMemoryWindowsStore(int maxSize, WindowsStore backingStore) {
+        this.maxSize = maxSize;
+        currentSize = new AtomicInteger();
+        this.backingStore = backingStore;
+    }
+
+    @Override
+    public Object get(String key) {
+        Object value = store.get(key);
+
+        if(value == null && backingStore != null) {
+            value = backingStore.get(key);
+        }
+
+        return value;
+    }
+
+    @Override
+    public Iterable<Object> get(List<String> keys) {
+        List<Object> values = new ArrayList<>();
+        for (String key : keys) {
+            values.add(get(key));
+        }
+        return values;
+    }
+
+    @Override
+    public Iterable<String> getAllKeys() {
+        if(backingStore != null) {
+            return backingStore.getAllKeys();
+        }
+
+        final Enumeration<String> storeEnumeration = store.keys();
+        final Iterator<String> resultIterator = new Iterator<String>() {
+            @Override
+            public boolean hasNext() {
+                return storeEnumeration.hasMoreElements();
+            }
+
+            @Override
+            public String next() {
+                return  storeEnumeration.nextElement();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove operation is not supported as it is immutable.");
+            }
+        };
+
+        return new Iterable<String>() {
+            @Override
+            public Iterator<String> iterator() {
+                return resultIterator;
+            }
+        };
+    }
+
+    @Override
+    public void put(String key, Object value) {
+        _put(key, value);
+
+        if(backingStore != null) {
+            backingStore.put(key, value);
+        }
+    }
+
+    private void _put(String key, Object value) {
+        if(!canAdd()) {
+            return;
+        }
+
+        store.put(key, value);
+        incrementCurrentSize();
+    }
+
+    private void incrementCurrentSize() {
+        if(backingStore != null) {
+            currentSize.incrementAndGet();
+        }
+    }
+
+    private boolean canAdd() {
+        return backingStore == null || currentSize.get() < maxSize;
+    }
+
+    @Override
+    public void putAll(Collection<Entry> entries) {
+        for (Entry entry : entries) {
+            _put(entry.key, entry.value);
+        }
+        if(backingStore != null) {
+            backingStore.putAll(entries);
+        }
+    }
+
+    @Override
+    public void remove(String key) {
+        _remove(key);
+
+        if(backingStore != null) {
+            backingStore.remove(key);
+        }
+    }
+
+    private void _remove(String key) {
+        Object oldValue = store.remove(key);
+
+        if(oldValue != null) {
+            decrementSize();
+            if(backingStore != null) {
+                backingStore.remove(key);
+            }
+        }
+
+    }
+
+    private void decrementSize() {
+        if(backingStore != null) {
+            currentSize.decrementAndGet();
+        }
+    }
+
+    @Override
+    public void removeAll(Collection<String> keys) {
+        for (String key : keys) {
+            _remove(key);
+        }
+
+        if(backingStore != null) {
+            backingStore.removeAll(keys);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        store.clear();
+
+        if(backingStore != null) {
+            backingStore.shutdown();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "InMemoryWindowsStore{" +
+                " store:size = " + store.size() +
+                " backingStore = " + backingStore +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
new file mode 100644
index 0000000..32027a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
+
+    private InMemoryWindowsStore inMemoryWindowsStore;
+
+    @Override
+    public WindowsStore create() {
+        if(inMemoryWindowsStore == null) {
+            inMemoryWindowsStore = new InMemoryWindowsStore();
+        }
+        return inMemoryWindowsStore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/87a2d923/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
new file mode 100644
index 0000000..885e508
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains
+ * tuples cache of {@code maxCachedTuplesSize} without accessing store for getting them.
+ *
+ */
+public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
+    private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+
+    private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
+
+    private final String windowTupleTaskId;
+    private final TridentTupleView.FreshOutputFactory freshOutputFactory;
+
+    private Long maxCachedTuplesSize;
+    private final Fields inputFields;
+    private AtomicLong currentCachedTuplesSize = new AtomicLong();
+
+    public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+                                          BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
+        super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+
+        this.maxCachedTuplesSize = maxTuplesCacheSize;
+        this.inputFields = inputFields;
+        freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
+        windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
+    }
+
+    protected void initialize() {
+
+        // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
+        String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId);
+        String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId);
+
+        List<String> attemptedTriggerKeys = new ArrayList<>();
+        List<String> triggerKeys = new ArrayList<>();
+
+        Iterable<String> allEntriesIterable = windowStore.getAllKeys();
+        for (String key : allEntriesIterable) {
+            if (key.startsWith(windowTupleTaskId)) {
+                int tupleIndexValue = lastPart(key);
+                String batchId = secondLastPart(key);
+                log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+                windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
+            } else if (key.startsWith(windowTriggerTaskId)) {
+                triggerKeys.add(key);
+                log.debug("Received trigger with key [{}]", key);
+            } else if(key.startsWith(windowTriggerInprocessId)) {
+                attemptedTriggerKeys.add(key);
+                log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+            }
+        }
+
+        // these triggers will be retried as part of batch retries
+        Set<Integer> triggersToBeIgnored = new HashSet<>();
+        Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys);
+        for (Object attemptedTrigger : attemptedTriggers) {
+            triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger);
+        }
+
+        // get trigger values only if they have more than zero
+        Iterable<Object> triggerObjects = windowStore.get(triggerKeys);
+        int i=0;
+        for (Object triggerObject : triggerObjects) {
+            int id = lastPart(triggerKeys.get(i++));
+            if(!triggersToBeIgnored.contains(id)) {
+                log.info("Adding pending trigger value [{}]", triggerObject);
+                pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
+            }
+        }
+
+    }
+
+    private int lastPart(String key) {
+        int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+        return Integer.parseInt(key.substring(lastSepIndex+1));
+    }
+
+    private String secondLastPart(String key) {
+        int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("key "+key+" does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+        String trimKey = key.substring(0, lastSepIndex);
+        int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("key "+key+" does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+
+        return key.substring(secondLastSepIndex+1, lastSepIndex);
+    }
+
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+        // check if they are already added then ignore these tuples. This batch is replayed.
+        if (activeBatches.contains(getBatchTxnId(batchId))) {
+            log.info("Ignoring already added tuples with batch: %s", batchId);
+            return;
+        }
+
+        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        List<WindowsStore.Entry> entries = new ArrayList<>();
+        for (int i = 0; i < tuples.size(); i++) {
+            String key = keyOf(batchId);
+            TridentTuple tridentTuple = tuples.get(i);
+            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
+        }
+
+        // tuples should be available in store before they are added to window manager
+        windowStore.putAll(entries);
+
+        for (int i = 0; i < tuples.size(); i++) {
+            String key = keyOf(batchId);
+            TridentTuple tridentTuple = tuples.get(i);
+            addToWindowManager(i, key, tridentTuple);
+        }
+
+    }
+
+    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
+        TridentTuple actualTuple = null;
+        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
+            actualTuple = tridentTuple;
+        }
+        currentCachedTuplesSize.incrementAndGet();
+        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
+    }
+
+    public String getBatchTxnId(Object batchId) {
+        if (!(batchId instanceof IBatchID)) {
+            throw new IllegalArgumentException("argument should be an IBatchId instance");
+        }
+        return ((IBatchID) batchId).getId().toString();
+    }
+
+    public String keyOf(Object batchId) {
+        return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
+    }
+
+    public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
+        List<TridentTuple> resultTuples = new ArrayList<>();
+        List<String> keys = new ArrayList<>();
+        for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) {
+            TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys);
+            if(tuple != null) {
+                resultTuples.add(tuple);
+            }
+        }
+
+        if(keys.size() > 0) {
+            Iterable<Object> storedTupleValues = windowStore.get(keys);
+            for (Object storedTupleValue : storedTupleValues) {
+                TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue);
+                resultTuples.add(tridentTuple);
+            }
+        }
+
+        return resultTuples;
+    }
+
+    public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) {
+        if (tridentBatchTuple.tridentTuple != null) {
+            return tridentBatchTuple.tridentTuple;
+        }
+        keys.add(tupleKey(tridentBatchTuple));
+        return null;
+    }
+
+    public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
+        if (maxCachedTuplesSize != null) {
+            currentCachedTuplesSize.addAndGet(-expiredTuples.size());
+        }
+
+        List<String> keys = new ArrayList<>();
+        for (TridentBatchTuple expiredTuple : expiredTuples) {
+            keys.add(tupleKey(expiredTuple));
+        }
+
+        windowStore.removeAll(keys);
+    }
+
+    private String tupleKey(TridentBatchTuple tridentBatchTuple) {
+        return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
+    }
+
+}