You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 08:07:13 UTC

[1/9] storm git commit: STORM-676 Upmerged and resolved conflicts

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 1783d7ae9 -> b0db246ad


http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
new file mode 100644
index 0000000..13643db
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/TridentBatchTuple.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ *
+ */
+public class TridentBatchTuple {
+    final String effectiveBatchId;
+    final long timeStamp;
+    final int tupleIndex;
+    final TridentTuple tridentTuple;
+
+    public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex) {
+        this(effectiveBatchId, timeStamp, tupleIndex, null);
+    }
+
+    public TridentBatchTuple(String effectiveBatchId, long timeStamp, int tupleIndex, TridentTuple tridentTuple) {
+        this.effectiveBatchId = effectiveBatchId;
+        this.timeStamp = timeStamp;
+        this.tupleIndex = tupleIndex;
+        this.tridentTuple = tridentTuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
new file mode 100644
index 0000000..c2d9362
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.planner.processor.FreshCollector;
+import org.apache.storm.trident.planner.processor.TridentContext;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.ConsList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * {@code TridentProcessor} implementation for windowing operations on trident stream.
+ *
+ */
+public class WindowTridentProcessor implements TridentProcessor {
+    private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+
+    public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
+    public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
+    public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
+
+    public static final String TRIGGER_FIELD_NAME = "_task_info";
+    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+
+    private final String windowId;
+    private final Fields inputFields;
+    private final Aggregator aggregator;
+    private final boolean storeTuplesInStore;
+
+    private String windowTriggerInprocessId;
+    private WindowConfig windowConfig;
+    private WindowsStoreFactory windowStoreFactory;
+    private WindowsStore windowStore;
+
+    private Map conf;
+    private TopologyContext topologyContext;
+    private FreshCollector collector;
+    private TridentTupleView.ProjectionFactory projection;
+    private TridentContext tridentContext;
+    private ITridentWindowManager tridentWindowManager;
+    private String windowTaskId;
+
+    public WindowTridentProcessor(WindowConfig windowConfig, String uniqueWindowId, WindowsStoreFactory windowStoreFactory,
+                                  Fields inputFields, Aggregator aggregator, boolean storeTuplesInStore) {
+
+        this.windowConfig = windowConfig;
+        this.windowId = uniqueWindowId;
+        this.windowStoreFactory = windowStoreFactory;
+        this.inputFields = inputFields;
+        this.aggregator = aggregator;
+        this.storeTuplesInStore = storeTuplesInStore;
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
+        this.conf = conf;
+        this.topologyContext = context;
+        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
+        if (parents.size() != 1) {
+            throw new RuntimeException("Aggregation related operation can only have one parent");
+        }
+
+        Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf);
+
+        this.tridentContext = tridentContext;
+        collector = new FreshCollector(tridentContext);
+        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
+
+        windowStore = windowStoreFactory.create();
+        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
+        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
+
+        tridentWindowManager = storeTuplesInStore ?
+                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
+                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
+
+        tridentWindowManager.prepare();
+    }
+
+    public static String getWindowTriggerInprocessIdPrefix(String windowTaskId) {
+        return TRIGGER_INPROCESS_PREFIX + windowTaskId;
+    }
+
+    public static String getWindowTriggerTaskPrefix(String windowTaskId) {
+        return TRIGGER_PREFIX + windowTaskId;
+    }
+
+    private Long getWindowTuplesCacheSize(Map conf) {
+        if (conf.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) {
+            return ((Number) conf.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue();
+        }
+        return DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT;
+    }
+
+    @Override
+    public void cleanup() {
+        log.info("shutting down window manager");
+        tridentWindowManager.shutdown();
+    }
+
+    @Override
+    public void startBatch(ProcessorContext processorContext) {
+        // initialize state for batch
+        processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
+    }
+
+    @Override
+    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
+        // add tuple to the batch state
+        Object state = processorContext.state[tridentContext.getStateIndex()];
+        ((List<TridentTuple>) state).add(projection.create(tuple));
+    }
+
+    @Override
+    public void finishBatch(ProcessorContext processorContext) {
+
+        Object batchId = processorContext.batchId;
+        Object batchTxnId = getBatchTxnId(batchId);
+
+        log.debug("Received finishBatch of : {} ", batchId);
+        // get all the tuples in a batch and add it to trident-window-manager
+        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
+        tridentWindowManager.addTuplesBatch(batchId, tuples);
+
+        List<Integer> pendingTriggerIds = null;
+        List<String> triggerKeys = new ArrayList<>();
+        Iterable<Object> triggerValues = null;
+
+        if (retriedAttempt(batchId)) {
+            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
+            for (Integer pendingTriggerId : pendingTriggerIds) {
+                triggerKeys.add(triggerKey(pendingTriggerId));
+            }
+            triggerValues = windowStore.get(triggerKeys);
+        }
+
+        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
+        if(triggerValues == null) {
+            pendingTriggerIds = new ArrayList<>();
+            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
+            log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+            try {
+                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
+                List<Object> values = new ArrayList<>();
+                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
+                while (pendingTriggersIter.hasNext()) {
+                    triggerResult = pendingTriggersIter.next();
+                    for (List<Object> aggregatedResult : triggerResult.result) {
+                        String triggerKey = triggerKey(triggerResult.id);
+                        triggerKeys.add(triggerKey);
+                        values.add(aggregatedResult);
+                        pendingTriggerIds.add(triggerResult.id);
+                    }
+                    pendingTriggersIter.remove();
+                }
+                triggerValues = values;
+            } finally {
+                // store inprocess triggers of a batch in store for batch retries for any failures
+                if (!pendingTriggerIds.isEmpty()) {
+                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
+                }
+            }
+        }
+
+        collector.setContext(processorContext);
+        int i = 0;
+        for (Object resultValue : triggerValues) {
+            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
+        }
+        collector.setContext(null);
+    }
+
+    private String inprocessTriggerKey(Object batchTxnId) {
+        return windowTriggerInprocessId + batchTxnId;
+    }
+
+    public static Object getBatchTxnId(Object batchId) {
+        if (batchId instanceof IBatchID) {
+            return ((IBatchID) batchId).getId();
+        }
+        return null;
+    }
+
+    static boolean retriedAttempt(Object batchId) {
+        if (batchId instanceof IBatchID) {
+            return ((IBatchID) batchId).getAttemptId() > 0;
+        }
+
+        return false;
+    }
+
+    @Override
+    public TridentTuple.Factory getOutputFactory() {
+        return collector.getOutputFactory();
+    }
+
+    public static class TriggerInfo implements Serializable {
+        public final String windowTaskId;
+        public final int triggerId;
+
+        public TriggerInfo(String windowTaskId, int triggerId) {
+            this.windowTaskId = windowTaskId;
+            this.triggerId = triggerId;
+        }
+
+        public String generateTriggerKey() {
+            return generateWindowTriggerKey(windowTaskId, triggerId);
+        }
+
+        @Override
+        public String toString() {
+            return "TriggerInfo{" +
+                    "windowTaskId='" + windowTaskId + '\'' +
+                    ", triggerId=" + triggerId +
+                    '}';
+        }
+    }
+
+    public String triggerKey(int triggerId) {
+        return generateWindowTriggerKey(windowTaskId, triggerId);
+    }
+
+    public static String generateWindowTriggerKey(String windowTaskId, int triggerId) {
+        return TRIGGER_PREFIX + windowTaskId + triggerId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
new file mode 100644
index 0000000..378d24f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code State} implementation for windowing operation. This is mainly used to get callback of commit txId of batches
+ * in which triggers are emitted.
+ *
+ */
+public class WindowsState implements State {
+    private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+
+    private Long currentTxId;
+
+    public WindowsState() {
+    }
+
+    @Override
+    public void beginCommit(Long txId) {
+        currentTxId = txId;
+        log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+    }
+
+    @Override
+    public void commit(Long txId) {
+        log.debug("WindowsState.commit :: [{}]", txId);
+    }
+
+    public Long getCurrentTxId() {
+        return currentTxId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
new file mode 100644
index 0000000..28893c7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ * {@code StateFactory} instance for creating {@code WindowsState} instances.
+ *
+ */
+public class WindowsStateFactory implements StateFactory {
+
+    public WindowsStateFactory() {
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        return new WindowsState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
new file mode 100644
index 0000000..45ac885
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.IllegalClassException;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@code StateUpdater<WindowState>} instance which removes successfully emitted triggers from store
+ */
+public class WindowsStateUpdater implements StateUpdater<WindowsState> {
+
+    private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+
+    private final WindowsStoreFactory windowStoreFactory;
+    private WindowsStore windowsStore;
+
+    public WindowsStateUpdater(WindowsStoreFactory windowStoreFactory) {
+        this.windowStoreFactory = windowStoreFactory;
+    }
+
+    @Override
+    public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
+        Long currentTxId = state.getCurrentTxId();
+        log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+        for (TridentTuple tuple : tuples) {
+            try {
+                Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+                if(! (fieldValue instanceof WindowTridentProcessor.TriggerInfo)) {
+                    throw new IllegalClassException(WindowTridentProcessor.TriggerInfo.class, fieldValue.getClass());
+                }
+                WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
+                String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
+
+                log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+
+                windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
+            } catch (Exception ex) {
+                log.warn(ex.getMessage());
+                collector.reportError(ex);
+                throw new FailedException(ex);
+            }
+        }
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+        windowsStore = windowStoreFactory.create();
+    }
+
+    @Override
+    public void cleanup() {
+        windowsStore.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
new file mode 100644
index 0000000..8904b7b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Store for storing window related entities like windowed tuples, triggers etc.
+ *
+ */
+public interface WindowsStore extends Serializable {
+
+    /**
+     * This can be used as a separator while generating a key from sequence of strings.
+     */
+    public static final String KEY_SEPARATOR = "|";
+
+    public Object get(String key);
+
+    public Iterable<Object> get(List<String> keys);
+
+    public Iterable<String> getAllKeys();
+
+    public void put(String key, Object value);
+
+    public void putAll(Collection<Entry> entries);
+
+    public void remove(String key);
+
+    public void removeAll(Collection<String> keys);
+
+    public void shutdown();
+
+    /**
+     * This class wraps key and value objects which can be passed to {@code putAll} method.
+     */
+    public static class Entry implements Serializable {
+        public final String key;
+        public final Object value;
+
+        public Entry(String key, Object value) {
+            nonNullCheckForKey(key);
+            nonNullCheckForValue(value);
+            this.key = key;
+            this.value = value;
+        }
+
+        public static void nonNullCheckForKey(Object key) {
+            Preconditions.checkArgument(key != null, "key argument can not be null");
+        }
+
+        public static void nonNullCheckForValue(Object value) {
+            Preconditions.checkArgument(value != null, "value argument can not be null");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
new file mode 100644
index 0000000..409d672
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+
+/**
+ * Factory to create instances of {@code WindowsStore}.
+ *
+ */
+public interface WindowsStoreFactory extends Serializable {
+
+    /**
+     * Creates a window store
+     *
+     * @return
+     */
+    public WindowsStore create();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
new file mode 100644
index 0000000..8f9ef1a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ *
+ */
+public abstract class BaseWindowConfig implements WindowConfig {
+    protected final int windowLength;
+    protected final int slideLength;
+
+    protected BaseWindowConfig(int windowLength, int slideLength) {
+        this.windowLength = windowLength;
+        this.slideLength = slideLength;
+    }
+
+    @Override
+    public int getWindowLength() {
+        return windowLength;
+    }
+
+    @Override
+    public int getSlidingLength() {
+        return slideLength;
+    }
+
+    public void validate() {
+        if (slideLength > windowLength) {
+            throw new IllegalArgumentException("slideLength '" + slideLength + "' should always be less than windowLegth '" + windowLength + "'");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
new file mode 100644
index 0000000..a0dd13c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
+ * at every count of given {@code slideLength}
+ *
+ */
+public final class SlidingCountWindow extends BaseWindowConfig {
+
+    private SlidingCountWindow(int windowLength, int slideLength) {
+        super(windowLength, slideLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.SLIDING_COUNT;
+    }
+
+    public static SlidingCountWindow of(int windowCount, int slidingCount) {
+        return new SlidingCountWindow(windowCount, slidingCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
new file mode 100644
index 0000000..f2fe291
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
+ * at every {@code slideLength} interval.
+ *
+ */
+public final class SlidingDurationWindow extends BaseWindowConfig {
+
+    private SlidingDurationWindow(int windowLength, int slideLength) {
+        super(windowLength, slideLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.SLIDING_DURATION;
+    }
+
+    public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {
+        return new SlidingDurationWindow(windowDuration.value, slidingDuration.value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
new file mode 100644
index 0000000..a5f3528
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+/**
+ * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
+ */
+public final class TumblingCountWindow extends BaseWindowConfig {
+
+    private TumblingCountWindow(int windowLength) {
+        super(windowLength, windowLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.TUMBLING_COUNT;
+    }
+
+    public static TumblingCountWindow of(int windowLength) {
+        return new TumblingCountWindow(windowLength);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
new file mode 100644
index 0000000..8beb68d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+/**
+ * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
+ */
+public final class TumblingDurationWindow extends BaseWindowConfig {
+
+    private TumblingDurationWindow(int windowLength) {
+        super(windowLength, windowLength);
+    }
+
+    @Override
+    public Type getWindowType() {
+        return Type.TUMBLING_DURATION;
+    }
+
+    public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {
+        return new TumblingDurationWindow(windowLength.value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
new file mode 100644
index 0000000..49347e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import java.io.Serializable;
+
+/**
+ * Windowing configuration with window and sliding length.
+ */
+public interface WindowConfig extends Serializable {
+
+    /**
+     * Returns the length of the window.
+     * @return
+     */
+    public int getWindowLength();
+
+    /**
+     * Returns the sliding length of the moving window.
+     * @return
+     */
+    public int getSlidingLength();
+
+    /**
+     * Gives the type of windowing. It can be any of {@code Type} values.
+     *
+     * @return
+     */
+    public Type getWindowType();
+
+    public void validate();
+
+    public enum Type {
+        SLIDING_COUNT,
+        TUMBLING_COUNT,
+        SLIDING_DURATION,
+        TUMBLING_DURATION
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
new file mode 100644
index 0000000..ed9befa
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/BaseWindowStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public abstract class BaseWindowStrategy<T> implements WindowStrategy<T> {
+    protected final WindowConfig windowConfig;
+
+    public BaseWindowStrategy(WindowConfig windowConfig) {
+        this.windowConfig = windowConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
new file mode 100644
index 0000000..c26b795
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingCountWindowStrategy.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window count and sliding interval count from the
+ * given {@code slidingCountWindow} configuration.
+ */
+public class SlidingCountWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+    public SlidingCountWindowStrategy(WindowConfig slidingCountWindow) {
+        super(slidingCountWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+        return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
new file mode 100644
index 0000000..9e71220
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents sliding window strategy based on the sliding window duration and sliding interval from the
+ * given {@code slidingCountWindow} configuration.
+ *
+ **/
+public final class SlidingDurationWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+    public SlidingDurationWindowStrategy(WindowConfig slidingDurationWindow) {
+        super(slidingDurationWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after window duration is reached.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
new file mode 100644
index 0000000..5e4d6fe
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingCountWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.CountEvictionPolicy;
+import org.apache.storm.windowing.CountTriggerPolicy;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window count from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window and sliding lengths are equal.
+ *
+ */
+public final class TumblingCountWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+    public TumblingCountWindowStrategy(WindowConfig tumblingCountWindow) {
+        super(tumblingCountWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every count of given sliding window.
+
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+        return new CountTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after a count of given window length.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new CountEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
new file mode 100644
index 0000000..4478667
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/TumblingDurationWindowStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TimeEvictionPolicy;
+import org.apache.storm.windowing.TimeTriggerPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+
+/**
+ * This class represents tumbling window strategy based on the window duration from the
+ * given {@code slidingCountWindow} configuration. In this strategy , window and sliding durations are equal.
+ *
+ */
+public final class TumblingDurationWindowStrategy<T> extends BaseWindowStrategy<T> {
+
+    public TumblingDurationWindowStrategy(WindowConfig tumblingDurationWindow) {
+        super(tumblingDurationWindow);
+    }
+
+    /**
+     * Returns a {@code TriggerPolicy} which triggers for every given sliding duration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    @Override
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
+        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
+    }
+
+    /**
+     * Returns an {@code EvictionPolicy} instance which evicts elements after given window duration.
+     *
+     * @return
+     */
+    @Override
+    public EvictionPolicy<T> getEvictionPolicy() {
+        return new TimeEvictionPolicy<>(windowConfig.getWindowLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
new file mode 100644
index 0000000..1dfb264
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerHandler;
+import org.apache.storm.windowing.TriggerPolicy;
+/**
+ * Strategy for windowing which will have respective trigger and eviction policies.
+ */
+public interface WindowStrategy<T> {
+
+    /**
+     * Returns a {@code TriggerPolicy}  by creating with {@code triggerHandler} and {@code evictionPolicy} with
+     * the given configuration.
+     *
+     * @param triggerHandler
+     * @param evictionPolicy
+     * @return
+     */
+    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy);
+
+    /**
+     * Returns an {@code EvictionPolicy} instance for this strategy with the given configuration.
+     *
+     * @return
+     */
+    public EvictionPolicy<T> getEvictionPolicy();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
new file mode 100644
index 0000000..d8a3918
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public final class WindowStrategyFactory {
+
+    private WindowStrategyFactory() {
+    }
+
+    /**
+     * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
+     *
+     * @param windowConfig
+     * @return
+     */
+    public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
+        WindowStrategy<T> windowStrategy = null;
+        WindowConfig.Type windowType = windowConfig.getWindowType();
+        switch(windowType) {
+            case SLIDING_COUNT:
+                windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
+                break;
+            case TUMBLING_COUNT:
+                windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
+                break;
+            case SLIDING_DURATION:
+                windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
+                break;
+            case TUMBLING_DURATION:
+                windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
+                break;
+            default:
+                throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
+        }
+
+        return windowStrategy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
index eef4968..ac1fa1c 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TriggerHandler.java
@@ -21,7 +21,7 @@ package org.apache.storm.windowing;
  * The callback fired by {@link TriggerPolicy} when the trigger
  * condition is satisfied.
  */
-interface TriggerHandler {
+public interface TriggerHandler {
     /**
      * The code to execute when the {@link TriggerPolicy} condition is satisfied.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
new file mode 100644
index 0000000..03f298d
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident;
+
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.InMemoryWindowsStore;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingTest {
+
+    @Test
+    public void testWindowStrategyInstances() throws Exception {
+
+        WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+        Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
+
+        WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+        Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
+
+        WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
+                TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+        Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
+
+        WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
+                SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+                        new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+        Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
+    }
+
+    @Test
+    public void testWindowConfig() {
+        int windowLength = 9;
+        TumblingCountWindow tumblingCountWindow = TumblingCountWindow.of(windowLength);
+        Assert.assertTrue(tumblingCountWindow.getWindowLength() == windowLength);
+        Assert.assertTrue(tumblingCountWindow.getSlidingLength() == windowLength);
+
+        windowLength = 10;
+        int slidingLength = 2;
+        SlidingCountWindow slidingCountWindow = SlidingCountWindow.of(10, 2);
+        Assert.assertTrue(slidingCountWindow.getWindowLength() == windowLength);
+        Assert.assertTrue(slidingCountWindow.getSlidingLength() == slidingLength);
+
+        windowLength = 20;
+        TumblingDurationWindow tumblingDurationWindow = TumblingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS));
+        Assert.assertTrue(tumblingDurationWindow.getWindowLength() == windowLength*1000);
+        Assert.assertTrue(tumblingDurationWindow.getSlidingLength() == windowLength*1000);
+
+        windowLength = 50;
+        slidingLength = 10;
+        SlidingDurationWindow slidingDurationWindow = SlidingDurationWindow.of(new BaseWindowedBolt.Duration(windowLength, TimeUnit.SECONDS),
+                new BaseWindowedBolt.Duration(slidingLength, TimeUnit.SECONDS));
+        Assert.assertTrue(slidingDurationWindow.getWindowLength() == windowLength*1000);
+        Assert.assertTrue(slidingDurationWindow.getSlidingLength() == slidingLength*1000);
+    }
+
+    @Test
+    public void testInMemoryWindowStore() {
+        InMemoryWindowsStore store = new InMemoryWindowsStore();
+        String keyPrefix = "key";
+        String valuePrefix = "valuePrefix";
+
+        int ct = 10;
+        for (int i=0; i<ct; i++) {
+            store.put(keyPrefix +i, valuePrefix +i);
+        }
+
+        for (int i=0; i<ct; i++) {
+            Assert.assertTrue((valuePrefix + i).equals(store.get(keyPrefix + i)));
+        }
+
+        store.remove(keyPrefix+1);
+        Assert.assertNull(store.get(keyPrefix+1));
+
+    }
+
+}
\ No newline at end of file


[7/9] storm git commit: STORM-676 Refactoring of WindowConfig APIs

Posted by sr...@apache.org.
STORM-676 Refactoring of WindowConfig APIs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c263c7c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c263c7c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c263c7c

Branch: refs/heads/1.x-branch
Commit: 8c263c7c08a3ebd11a1d5df4996b7e12422cd721
Parents: b08d7ea
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 22:35:08 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:47:14 2016 +0530

----------------------------------------------------------------------
 .../windowing/AbstractTridentWindowManager.java |  4 +-
 .../windowing/config/SlidingCountWindow.java    |  7 ++-
 .../windowing/config/SlidingDurationWindow.java |  6 +-
 .../windowing/config/TumblingCountWindow.java   |  8 ++-
 .../config/TumblingDurationWindow.java          |  6 +-
 .../trident/windowing/config/WindowConfig.java  |  4 +-
 .../strategy/WindowStrategyFactory.java         | 60 --------------------
 .../storm/trident/TridentWindowingTest.java     | 25 ++++----
 8 files changed, 33 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index aac18d3..f93527a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.config.WindowConfig;
 import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
 import org.apache.storm.windowing.EvictionPolicy;
 import org.apache.storm.windowing.TriggerPolicy;
 import org.apache.storm.windowing.WindowLifecycleListener;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
@@ -71,7 +69,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
         windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
 
-        WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
         EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
         windowManager.setEvictionPolicy(evictionPolicy);
         triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
index a0dd13c..2e2d388 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -18,6 +18,9 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 /**
  * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
  * at every count of given {@code slideLength}
@@ -30,8 +33,8 @@ public final class SlidingCountWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.SLIDING_COUNT;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new SlidingCountWindowStrategy<>(this);
     }
 
     public static SlidingCountWindow of(int windowCount, int slidingCount) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
index f2fe291..befd4e3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -19,6 +19,8 @@
 package org.apache.storm.trident.windowing.config;
 
 import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
 
 /**
  * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
@@ -32,8 +34,8 @@ public final class SlidingDurationWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.SLIDING_DURATION;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new SlidingDurationWindowStrategy<>(this);
     }
 
     public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
index a5f3528..1988850 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -18,6 +18,10 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 /**
  * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
  */
@@ -28,8 +32,8 @@ public final class TumblingCountWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.TUMBLING_COUNT;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new TumblingCountWindowStrategy<>(this);
     }
 
     public static TumblingCountWindow of(int windowLength) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
index 8beb68d..3881a74 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -19,6 +19,8 @@
 package org.apache.storm.trident.windowing.config;
 
 import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
 
 /**
  * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
@@ -30,8 +32,8 @@ public final class TumblingDurationWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.TUMBLING_DURATION;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new TumblingDurationWindowStrategy<>(this);
     }
 
     public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
index 49347e7..7cb78ee 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -18,6 +18,8 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 import java.io.Serializable;
 
 /**
@@ -42,7 +44,7 @@ public interface WindowConfig extends Serializable {
      *
      * @return
      */
-    public Type getWindowType();
+    public <T> WindowStrategy<T> getWindowStrategy();
 
     public void validate();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
deleted file mode 100644
index d8a3918..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.windowing.strategy;
-
-import org.apache.storm.trident.windowing.config.WindowConfig;
-
-/**
- *
- */
-public final class WindowStrategyFactory {
-
-    private WindowStrategyFactory() {
-    }
-
-    /**
-     * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
-     *
-     * @param windowConfig
-     * @return
-     */
-    public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
-        WindowStrategy<T> windowStrategy = null;
-        WindowConfig.Type windowType = windowConfig.getWindowType();
-        switch(windowType) {
-            case SLIDING_COUNT:
-                windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
-                break;
-            case TUMBLING_COUNT:
-                windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
-                break;
-            case SLIDING_DURATION:
-                windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
-                break;
-            case TUMBLING_DURATION:
-                windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
-                break;
-            default:
-                throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
-        }
-
-        return windowStrategy;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/8c263c7c/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
index 03f298d..4b82b89 100644
--- a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -24,15 +24,8 @@ import org.apache.storm.trident.windowing.config.SlidingCountWindow;
 import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
 import org.apache.storm.trident.windowing.config.TumblingCountWindow;
 import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
-import org.junit.After;
+import org.apache.storm.trident.windowing.strategy.*;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -45,19 +38,21 @@ public class TridentWindowingTest {
     @Test
     public void testWindowStrategyInstances() throws Exception {
 
-        WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+        WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy();
         Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
 
-        WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+        WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy();
         Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
 
-        WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
-                TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+        WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of(
+                                                            new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+                                                            .getWindowStrategy();
         Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
 
-        WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
-                SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
-                        new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+        WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of(
+                                                            new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+                                                            new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))
+                                                            .getWindowStrategy();
         Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
     }
 


[9/9] storm git commit: Added STORM-676 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-676 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0db246a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0db246a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0db246a

Branch: refs/heads/1.x-branch
Commit: b0db246add3d2ca6c9b23784c6a34a92252bb20b
Parents: b661bf8
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Mar 27 23:06:28 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Mar 27 23:06:28 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b0db246a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 06eb7be..ae99355 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-676: Storm Trident support for sliding/tumbling windows
  * STORM-1630: Add guide page for Windows users
  * STORM-1655: Flux doesn't set return code to non-zero when there's any exception while deploying topology to remote cluster
  * STORM-1654: HBaseBolt creates tick tuples with no interval when we don't set flushIntervalSecs 


[2/9] storm git commit: STORM-676 Upmerged and resolved conflicts

Posted by sr...@apache.org.
STORM-676 Upmerged and resolved conflicts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89c03b83
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89c03b83
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89c03b83

Branch: refs/heads/1.x-branch
Commit: 89c03b83e990aa7fbc732caf055e6bd09e2fc479
Parents: 139a8a3
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 12:20:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:45:52 2016 +0530

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  44 ++-
 .../TridentHBaseWindowingStoreTopology.java     | 124 +++++++++
 .../TridentWindowingInmemoryStoreTopology.java  | 134 +++++++++
 .../trident/windowing/HBaseWindowsStore.java    | 275 +++++++++++++++++++
 .../windowing/HBaseWindowsStoreFactory.java     |  51 ++++
 storm-core/src/jvm/org/apache/storm/Config.java |   8 +
 .../jvm/org/apache/storm/trident/Stream.java    | 223 +++++++++++++--
 .../apache/storm/trident/TridentTopology.java   |   4 +
 .../storm/trident/fluent/UniqueIdGen.java       |  14 +-
 .../storm/trident/operation/builtin/Debug.java  |   4 +-
 .../windowing/AbstractTridentWindowManager.java | 241 ++++++++++++++++
 .../windowing/ITridentWindowManager.java        |  59 ++++
 .../windowing/InMemoryTridentWindowManager.java |  78 ++++++
 .../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
 .../windowing/InMemoryWindowsStoreFactory.java  |  37 +++
 .../StoreBasedTridentWindowManager.java         | 223 +++++++++++++++
 .../trident/windowing/TridentBatchTuple.java    |  42 +++
 .../windowing/WindowTridentProcessor.java       | 260 ++++++++++++++++++
 .../storm/trident/windowing/WindowsState.java   |  52 ++++
 .../trident/windowing/WindowsStateFactory.java  |  40 +++
 .../trident/windowing/WindowsStateUpdater.java  |  81 ++++++
 .../storm/trident/windowing/WindowsStore.java   |  78 ++++++
 .../trident/windowing/WindowsStoreFactory.java  |  35 +++
 .../windowing/config/BaseWindowConfig.java      |  48 ++++
 .../windowing/config/SlidingCountWindow.java    |  40 +++
 .../windowing/config/SlidingDurationWindow.java |  42 +++
 .../windowing/config/TumblingCountWindow.java   |  39 +++
 .../config/TumblingDurationWindow.java          |  40 +++
 .../trident/windowing/config/WindowConfig.java  |  55 ++++
 .../windowing/strategy/BaseWindowStrategy.java  |  32 +++
 .../strategy/SlidingCountWindowStrategy.java    |  59 ++++
 .../strategy/SlidingDurationWindowStrategy.java |  60 ++++
 .../strategy/TumblingCountWindowStrategy.java   |  60 ++++
 .../TumblingDurationWindowStrategy.java         |  60 ++++
 .../windowing/strategy/WindowStrategy.java      |  45 +++
 .../strategy/WindowStrategyFactory.java         |  60 ++++
 .../apache/storm/windowing/TriggerHandler.java  |   2 +-
 .../storm/trident/TridentWindowingTest.java     | 110 ++++++++
 38 files changed, 3019 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index a44e14c..e702a5d 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -31,10 +31,13 @@
   <name>storm-starter</name>
 
   <properties>
-     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-     <!-- see comment below... This fixes an annoyance with intellij -->
-     <provided.scope>provided</provided.scope>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <!-- see comment below... This fixes an annoyance with intellij -->
+    <provided.scope>provided</provided.scope>
+    <hbase.version>0.98.4-hadoop2</hbase.version>
+    <hbase.version>1.1.2</hbase.version>
   </properties>
+
   <profiles>
     <profile>
         <id>intellij</id>
@@ -139,6 +142,11 @@
       <artifactId>storm-hdfs</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>storm-hbase</artifactId>
+          <version>${project.version}</version>
+      </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
@@ -167,6 +175,36 @@
       <artifactId>storm-redis</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <version>${hbase.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.apache.zookeeper</groupId>
+                  <artifactId>zookeeper</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.apache.zookeeper</groupId>
+                  <artifactId>zookeeper</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
new file mode 100644
index 0000000..24cc8d9
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+    private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+    public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+
+        Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+                new Split(), new Fields("word"))
+                .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+//                .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+                .each(new Fields("count"), new Debug())
+                .each(new Fields("count"), new Echo(), new Fields("ct"));
+
+        return topology.build();
+    }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    static class Echo implements Function {
+
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            log.info("##########Echo.execute: " + tuple);
+            collector.emit(tuple.getValues());
+        }
+
+        @Override
+        public void prepare(Map conf, TridentOperationContext context) {
+
+        }
+
+        @Override
+        public void cleanup() {
+
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+
+        // window-state table should already be created with cf:tuples column
+        HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            String topologyName = "wordCounterWithWindowing";
+            cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory));
+            Utils.sleep(120 * 1000);
+            cluster.killTopology(topologyName);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
new file mode 100644
index 0000000..5f0cb4f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingInmemoryStoreTopology {
+    private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+    public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+
+        Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
+                new Split(), new Fields("word"))
+                .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
+//                .aggregate(new CountAsAggregator(), new Fields("count"))
+                .each(new Fields("count"), new Debug())
+                .each(new Fields("count"), new Echo(), new Fields("ct"))
+                .each(new Fields("ct"), new Debug());
+
+        return topology.build();
+    }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    static class Echo implements Function {
+
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            log.info("##########Echo.execute: " + tuple);
+            collector.emit(tuple.getValues());
+        }
+
+        @Override
+        public void prepare(Map conf, TridentOperationContext context) {
+
+        }
+
+        @Override
+        public void cleanup() {
+
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
+
+        if (args.length == 0) {
+            List<? extends WindowConfig> list = Arrays.asList(
+                    SlidingCountWindow.of(1000, 100)
+                    ,TumblingCountWindow.of(1000)
+                    ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+                    ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
+            );
+
+            for (WindowConfig windowConfig : list) {
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
+                Utils.sleep(60 * 1000);
+                cluster.shutdown();
+            }
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
new file mode 100644
index 0000000..47879a4
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+    private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+    public static final String UTF_8 = "utf-8";
+
+    private final ThreadLocal<HTable> threadLocalHtable;
+    private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+    private final byte[] family;
+    private final byte[] qualifier;
+
+    public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+        this.family = family;
+        this.qualifier = qualifier;
+
+        threadLocalHtable = new ThreadLocal<HTable>() {
+            @Override
+            protected HTable initialValue() {
+                try {
+                    HTable hTable = new HTable(config, tableName);
+                    htables.add(hTable);
+                    return hTable;
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+    }
+
+    private HTable htable() {
+        return threadLocalHtable.get();
+    }
+
+    private byte[] effectiveKey(String key) {
+        try {
+            return key.getBytes(UTF_8);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Object get(String key) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+
+        byte[] effectiveKey = effectiveKey(key);
+        Get get = new Get(effectiveKey);
+        Result result = null;
+        try {
+            result = htable().get(get);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        if(result.isEmpty()) {
+            return null;
+        }
+
+        Kryo kryo = new Kryo();
+        Input input = new Input(result.getValue(family, qualifier));
+        Object resultObject = kryo.readClassAndObject(input);
+        return resultObject;
+
+    }
+
+    @Override
+    public Iterable<Object> get(List<String> keys) {
+        List<Get> gets = new ArrayList<>();
+        for (String key : keys) {
+            WindowsStore.Entry.nonNullCheckForKey(key);
+
+            byte[] effectiveKey = effectiveKey(key);
+            gets.add(new Get(effectiveKey));
+        }
+
+        Result[] results = null;
+        try {
+            results = htable().get(gets);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        Kryo kryo = new Kryo();
+        List<Object> values = new ArrayList<>();
+        for (int i=0; i<results.length; i++) {
+            Result result = results[i];
+            if(result.isEmpty()) {
+                log.error("Got empty result for key [{}]", keys.get(i));
+                throw new RuntimeException("Received empty result for key: "+keys.get(i));
+            }
+            Input input = new Input(result.getValue(family, qualifier));
+            Object resultObject = kryo.readClassAndObject(input);
+            values.add(resultObject);
+        }
+
+        return values;
+    }
+
+    @Override
+    public Iterable<String> getAllKeys() {
+        Scan scan = new Scan();
+        // this filter makes sure to receive only Key or row but not values associated with those rows.
+        scan.setFilter(new FirstKeyOnlyFilter());
+        //scan.setCaching(1000);
+
+        final Iterator<Result> resultIterator;
+        try {
+            resultIterator = htable().getScanner(scan).iterator();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        final Iterator<String> iterator = new Iterator<String>() {
+            @Override
+            public boolean hasNext() {
+                return resultIterator.hasNext();
+            }
+
+            @Override
+            public String next() {
+                Result result = resultIterator.next();
+                String key = null;
+                try {
+                    key = new String(result.getRow(), UTF_8);
+                } catch (UnsupportedEncodingException e) {
+                    throw new RuntimeException(e);
+                }
+                return key;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove operation is not supported");
+            }
+        };
+
+        return new Iterable<String>() {
+            @Override
+            public Iterator<String> iterator() {
+                return iterator;
+            }
+        };
+    }
+
+    @Override
+    public void put(String key, Object value) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+        WindowsStore.Entry.nonNullCheckForValue(value);
+
+        if(value == null) {
+            throw new IllegalArgumentException("Invalid value of null with key: "+key);
+        }
+        Put put = new Put(effectiveKey(key));
+        Kryo kryo = new Kryo();
+        Output output = new Output(new ByteArrayOutputStream());
+        kryo.writeClassAndObject(output, value);
+        put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+        try {
+            htable().put(put);
+        } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void putAll(Collection<Entry> entries) {
+        List<Put> list = new ArrayList<>();
+        for (Entry entry : entries) {
+            Put put = new Put(effectiveKey(entry.key));
+            Output output = new Output(new ByteArrayOutputStream());
+            Kryo kryo = new Kryo();
+            kryo.writeClassAndObject(output, entry.value);
+            put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position()));
+            list.add(put);
+        }
+
+        try {
+            htable().put(list);
+        } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void remove(String key) {
+        WindowsStore.Entry.nonNullCheckForKey(key);
+
+        Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+        try {
+            htable().delete(delete);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void removeAll(Collection<String> keys) {
+        List<Delete> deleteBatch = new ArrayList<>();
+        for (String key : keys) {
+            WindowsStore.Entry.nonNullCheckForKey(key);
+
+            Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis());
+            deleteBatch.add(delete);
+        }
+        try {
+            htable().delete(deleteBatch);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        // close all the created hTable instances
+        for (HTable htable : htables) {
+            try {
+                htable.close();
+            } catch (IOException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
new file mode 100644
index 0000000..56fad58
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+    private final Map<String, Object> config;
+    private final String tableName;
+    private final byte[] family;
+    private final byte[] qualifier;
+
+    public HBaseWindowsStoreFactory(Map<String, Object> config, String tableName, byte[] family, byte[] qualifier) {
+        this.config = config;
+        this.tableName = tableName;
+        this.family = family;
+        this.qualifier = qualifier;
+    }
+
+    public WindowsStore create() {
+        Configuration configuration = HBaseConfiguration.create();
+        for (Map.Entry<String, Object> entry : config.entrySet()) {
+            if (entry.getValue() != null) {
+                configuration.set(entry.getKey(), entry.getValue().toString());
+            }
+        }
+        return new HBaseWindowsStore(configuration, tableName, family, qualifier);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 37565b8..507614b 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1974,6 +1974,14 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
 
     /**
+     * Maximum number of tuples that can be stored inmemory cache in windowing operators for fast access without fetching
+     * them from store.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT="topology.trident.windowing.cache.tuple.limit";
+
+    /**
      * Name of the topology. This config is automatically set by Storm when the topology is submitted.
      */
     @isString

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 4a51b56..b3a4446 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.topology.ResourceDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
@@ -68,10 +69,22 @@ import org.apache.storm.trident.state.StateSpec;
 import org.apache.storm.trident.state.StateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowTridentProcessor;
+import org.apache.storm.trident.windowing.WindowsStateFactory;
+import org.apache.storm.trident.windowing.WindowsStateUpdater;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
 /**
  * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -92,10 +105,10 @@ import java.util.Comparator;
  */
 // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
 public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
-    Node _node;
-    TridentTopology _topology;
-    String _name;
-    
+    final Node _node;
+    final String _name;
+    private final TridentTopology _topology;
+
     protected Stream(TridentTopology topology, String name, Node node) {
         _topology = topology;
         _node = node;
@@ -180,7 +193,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      */
     public GroupedStream groupBy(Fields fields) {
         projectionValidation(fields);
-        return new GroupedStream(this, fields);        
+        return new GroupedStream(this, fields);
     }
 
     /**
@@ -287,7 +300,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         if(_node instanceof PartitionNode) {
             return each(new Fields(), new TrueFilter()).partition(grouping);
         } else {
-            return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));       
+            return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
         }
     }
 
@@ -323,7 +336,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
                     functionFields,
                     new AggregateProcessor(inputFields, agg)));
     }
-    
+
     public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
         projectionValidation(inputFields);
         String stateId = state._node.stateInfo.id;
@@ -335,11 +348,11 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         _topology._colocate.get(stateId).add(n);
         return _topology.addSourcedNode(this, n);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields) {
       return partitionPersist(new StateSpec(stateFactory), inputFields, updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
         projectionValidation(inputFields);
         String id = _topology.getUniqueStateId();
@@ -352,19 +365,19 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         n.stateInfo = new NodeStateInfo(id, stateSpec);
         return _topology.addSourcedStateNode(this, n);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater) {
       return partitionPersist(stateFactory, inputFields, updater, new Fields());
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater) {
-      return partitionPersist(stateSpec, inputFields, updater, new Fields());        
+      return partitionPersist(stateSpec, inputFields, updater, new Fields());
     }
-    
+
     public Stream each(Function function, Fields functionFields) {
         return each(null, function, functionFields);
     }
-    
+
     public Stream each(Fields inputFields, Filter filter) {
         return each(inputFields, new FilterExecutor(filter), new Fields());
     }
@@ -448,7 +461,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public ChainedAggregatorDeclarer chainedAgg() {
         return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme());
     }
-    
+
     public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
         return partitionAggregate(null, agg, functionFields);
     }
@@ -462,8 +475,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         return chainedAgg()
                .partitionAggregate(inputFields, agg, functionFields)
                .chainEnd();
-    }  
-    
+    }
+
     public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
         return partitionAggregate(null, agg, functionFields);
     }
@@ -565,7 +578,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public Stream aggregate(Aggregator agg, Fields functionFields) {
         return aggregate(null, agg, functionFields);
     }
-    
+
     public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
         projectionValidation(inputFields);
         return chainedAgg()
@@ -594,19 +607,169 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
                 .aggregate(inputFields, agg, functionFields)
                 .chainEnd();
     }
-    
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+     *
+     * @param windowCount represents window tuples count
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
+     *
+     * @param windowCount represents no of tuples in the window
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
+     * and slides the window with {@code slideCount}.
+     *
+     * @param windowCount represents tuples count of a window
+     * @param slideCount represents sliding count window
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+                                     Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+     *
+     * @param windowDuration represents tumbling window duration configuration
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+                                     Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+     * and completes a window at {@code windowDuration}
+     *
+     * @param windowDuration represents window duration configuration
+     * @param slideDuration represents sliding duration  configuration
+     * @param windowStoreFactory intermediary tuple store for storing windowing tuples
+     * @param inputFields projected fields for aggregator
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     *
+     * @return
+     */
+    public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+                                    WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+    }
+
+    /**
+     * Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store.
+     *
+     * @param windowConfig window configuration like window length and slide length.
+     * @param inputFields input fields
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     * @return
+     */
+    public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
+        // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
+        // as false int he below call.
+        InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory();
+        return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false);
+    }
+
+    /**
+     * Returns stream of aggregated results based on the given window configuration.
+     *
+     * @param windowConfig window configuration like window length and slide length.
+     * @param windowStoreFactory intermediary tuple store for storing tuples for windowing
+     * @param inputFields input fields
+     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
+     * @param functionFields fields of values to emit with aggregation.
+     * @return
+     */
+    public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
+                         Aggregator aggregator, Fields functionFields) {
+        return window(windowConfig, windowStoreFactory, inputFields, aggregator, functionFields, true);
+    }
+
+    private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator,
+                          Fields functionFields, boolean storeTuplesInStore) {
+        projectionValidation(inputFields);
+        windowConfig.validate();
+
+        Fields fields = addTriggerField(functionFields);
+
+        // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and
+        // that store is passed to WindowStateUpdater to remove them after committing the batch.
+        Stream stream = _topology.addSourcedNode(this,
+                new ProcessorNode(_topology.getUniqueStreamId(),
+                        _name,
+                        fields,
+                        fields,
+                        new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory,
+                                inputFields, aggregator, storeTuplesInStore)));
+
+        Stream effectiveStream = stream.project(functionFields);
+
+        // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store
+        // when they are successfully processed.
+        StateFactory stateFactory = new WindowsStateFactory();
+        StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory);
+        stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
+
+        return effectiveStream;
+    }
+
+    private Fields addTriggerField(Fields functionFields) {
+        List<String> fieldsList = new ArrayList<>();
+        fieldsList.add(WindowTridentProcessor.TRIGGER_FIELD_NAME);
+        for (String field : functionFields) {
+            fieldsList.add(field);
+        }
+        return new Fields(fieldsList);
+    }
+
     public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater, Fields functionFields) {
         return partitionPersist(new StateSpec(stateFactory), updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater, Fields functionFields) {
         return partitionPersist(stateSpec, null, updater, functionFields);
     }
-    
+
     public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater) {
         return partitionPersist(stateFactory, updater, new Fields());
     }
-    
+
     public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater) {
         return partitionPersist(stateSpec, updater, new Fields());
     }
@@ -622,7 +785,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
         return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
     }
-    
+
     public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
         projectionValidation(inputFields);
         // replaces normal aggregation here with a global grouping because it needs to be consistent across batches 
@@ -648,7 +811,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         projectionValidation(inputFields);
         return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
     }
-    
+
     public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
         return stateQuery(state, null, function, functionFields);
     }
@@ -662,7 +825,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     public Fields getOutputFields() {
         return _node.allOutputFields;
     }
-    
+
     static class BatchGlobalAggScheme implements GlobalAggregationScheme<Stream> {
 
         @Override
@@ -674,9 +837,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         public BatchToPartition singleEmitPartitioner() {
             return new IndexHashBatchToPartition();
         }
-        
+
     }
-    
+
     static class GlobalAggScheme implements GlobalAggregationScheme<Stream> {
 
         @Override
@@ -688,7 +851,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
         public BatchToPartition singleEmitPartitioner() {
             return new GlobalBatchToPartition();
         }
-        
+
     }
 
     private void projectionValidation(Fields projFields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index e0a349b..06e1576 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -829,6 +829,10 @@ public class TridentTopology {
     protected String getUniqueStateId() {
         return _gen.getUniqueStateId();
     }
+
+    protected String getUniqueWindowId() {
+        return _gen.getUniqueWindowId();
+    }
     
     protected void registerNode(Node n) {
         _graph.addVertex(n);

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
index 1364faf..e063142 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java
@@ -18,17 +18,21 @@
 package org.apache.storm.trident.fluent;
 
 public class UniqueIdGen {
-    int _streamCounter = 0;
-    
+    private int _streamCounter = 0;
+    private int _stateCounter = 0;
+    private int windowCounter = 0;
+
     public String getUniqueStreamId() {
         _streamCounter++;
         return "s" + _streamCounter;
     }
 
-    int _stateCounter = 0;
-    
     public String getUniqueStateId() {
         _stateCounter++;
         return "state" + _stateCounter;
-    }    
+    }
+
+    public String getUniqueWindowId() {
+        return "w"+ (++windowCounter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 07c5ae4..87c4167 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,8 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.Date;
+
 /**
  * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
  */
@@ -40,7 +42,7 @@ public class Debug extends BaseFilter {
 
     @Override
     public boolean isKeep(TridentTuple tuple) {
-        System.out.println(name + tuple.toString());
+        System.out.println("<"+new Date()+"> "+name + tuple.toString());
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
new file mode 100644
index 0000000..2941e28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
+    private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+    protected final WindowManager<T> windowManager;
+    protected final Aggregator aggregator;
+    protected final BatchOutputCollector delegateCollector;
+    protected final String windowTaskId;
+    protected final WindowsStore windowStore;
+
+    protected final Set<String> activeBatches = new HashSet<>();
+    protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
+    protected final AtomicInteger triggerId = new AtomicInteger();
+    private final String windowTriggerCountId;
+    private final TriggerPolicy<T> triggerPolicy;
+
+    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
+                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
+        this.windowTaskId = windowTaskId;
+        this.windowStore = windowStore;
+        this.aggregator = aggregator;
+        this.delegateCollector = delegateCollector;
+
+        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
+
+        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
+
+        WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
+        windowManager.setEvictionPolicy(evictionPolicy);
+        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
+        windowManager.setTriggerPolicy(triggerPolicy);
+    }
+
+    @Override
+    public void prepare() {
+        preInitialize();
+
+        initialize();
+
+        postInitialize();
+    }
+
+    private void preInitialize() {
+        log.debug("Getting current trigger count for this component/task");
+        // get trigger count value from store
+        Object result = windowStore.get(windowTriggerCountId);
+        Integer currentCount = 0;
+        if(result == null) {
+            log.info("No current trigger count in windows store.");
+        } else {
+            currentCount = (Integer) result + 1;
+        }
+        windowStore.put(windowTriggerCountId, currentCount);
+        triggerId.set(currentCount);
+    }
+
+    private void postInitialize() {
+        // start trigger once the initialization is done.
+        triggerPolicy.start();
+    }
+
+    /**
+     * Load and initialize any resources into window manager before windowing for component/task is activated.
+     */
+    protected abstract void initialize();
+
+    /**
+     * Listener to reeive any activation/expiry of windowing events and take further action on them.
+     */
+    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
+
+        @Override
+        public void onExpiry(List<T> expiredEvents) {
+            log.debug("onExpiry is invoked");
+            onTuplesExpired(expiredEvents);
+        }
+
+        @Override
+        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
+            log.debug("onActivation is invoked with events size: {}", events.size());
+            // trigger occurred, create an aggregation and keep them in store
+            int currentTriggerId = triggerId.incrementAndGet();
+            execAggregatorAndStoreResult(currentTriggerId, events);
+        }
+    }
+
+    /**
+     * Handle expired tuple events which can be removing from cache or store.
+     *
+     * @param expiredEvents
+     */
+    protected abstract void onTuplesExpired(List<T> expiredEvents);
+
+    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
+        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
+
+        // run aggregator to compute the result
+        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
+        Object state = aggregator.init(currentTriggerId, collector);
+        for (TridentTuple resultTuple : resultTuples) {
+            aggregator.aggregate(state, resultTuple, collector);
+        }
+        aggregator.complete(state, collector);
+
+        List<List<Object>> resultantAggregatedValue = collector.values;
+
+        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
+                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
+        windowStore.putAll(entries);
+
+        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
+    }
+
+    /**
+     * Return {@code TridentTuple}s from given {@code tupleEvents}.
+     * @param tupleEvents
+     * @return
+     */
+    protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents);
+
+    /**
+     * This {@code TridentCollector} accumulates all the values emitted.
+     */
+    static class AccumulatedTuplesCollector implements TridentCollector {
+
+        final List<List<Object>> values = new ArrayList<>();
+        private final BatchOutputCollector delegateCollector;
+
+        public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) {
+            this.delegateCollector = delegateCollector;
+        }
+
+        @Override
+        public void emit(List<Object> values) {
+            this.values.add(values);
+        }
+
+        @Override
+        public void reportError(Throwable t) {
+            delegateCollector.reportError(t);
+        }
+
+    }
+
+    static class TriggerResult {
+        final int id;
+        final List<List<Object>> result;
+
+        public TriggerResult(int id, List<List<Object>> result) {
+            this.id = id;
+            this.result = result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof TriggerResult)) return false;
+
+            TriggerResult that = (TriggerResult) o;
+
+            return id == that.id;
+
+        }
+
+        @Override
+        public int hashCode() {
+            return id;
+        }
+
+        @Override
+        public String toString() {
+            return "TriggerResult{" +
+                    "id=" + id +
+                    ", result=" + result +
+                    '}';
+        }
+    }
+
+    public Queue<TriggerResult> getPendingTriggers() {
+        return pendingTriggers;
+    }
+
+    public void shutdown() {
+        try {
+            log.info("window manager [{}] is being shutdown", windowManager);
+            windowManager.shutdown();
+        } finally {
+            log.info("window store [{}] is being shutdown", windowStore);
+            windowStore.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
new file mode 100644
index 0000000..d4aef79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Window manager to handle trident tuple events.
+ */
+public interface ITridentWindowManager {
+
+    /**
+     * This is invoked from {@code org.apache.storm.trident.planner.TridentProcessor}'s  prepare method. So any
+     * initialization tasks can be done before the topology starts accepting tuples. For ex:
+     * initialize window manager with any earlier stored tuples/triggers and start WindowManager
+     */
+    public void prepare();
+
+    /**
+     * This is invoked when from {@code org.apache.storm.trident.planner.TridentProcessor}'s  cleanup method. So, any
+     * cleanup operations like clearing cache or close store connection etc can be done.
+     */
+    public void shutdown();
+
+    /**
+     * Add received batch of tuples to cache/store and add them to {@code WindowManager}
+     *
+     * @param batchId
+     * @param tuples
+     */
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples);
+
+    /**
+     * Returns pending triggers to be emitted.
+     *
+     * @return
+     */
+    public Queue<StoreBasedTridentWindowManager.TriggerResult> getPendingTriggers();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
new file mode 100644
index 0000000..cbb30af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
+ */
+public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
+    private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+
+    public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+                                        BatchOutputCollector delegateCollector) {
+        super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+    }
+
+    @Override
+    protected void initialize() {
+        log.debug("noop in initialize");
+    }
+
+    @Override
+    public List<TridentTuple> getTridentTuples(List<TridentTuple> tridentBatchTuples) {
+        return tridentBatchTuples;
+    }
+
+    @Override
+    public void onTuplesExpired(List<TridentTuple> expiredTuples) {
+        log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+    }
+
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+        // check if they are already added then ignore these tuples. This batch is replayed.
+        if (activeBatches.contains(getBatchTxnId(batchId))) {
+            log.info("Ignoring already added tuples with batch: %s", batchId);
+            return;
+        }
+
+        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        for (TridentTuple tridentTuple : tuples) {
+            windowManager.add(tridentTuple);
+        }
+    }
+
+    public String getBatchTxnId(Object batchId) {
+        if (!(batchId instanceof IBatchID)) {
+            throw new IllegalArgumentException("argument should be an IBatchId instance");
+        }
+        return ((IBatchID) batchId).getId().toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
new file mode 100644
index 0000000..02d78e7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Inmemory store implementation of {@code WindowsStore} which can be backed by persistent store.
+ *
+ */
+public class InMemoryWindowsStore implements WindowsStore, Serializable {
+
+    private final ConcurrentHashMap<String, Object> store = new ConcurrentHashMap<>();
+
+    private int maxSize;
+    private AtomicInteger currentSize;
+    private WindowsStore backingStore;
+
+    public InMemoryWindowsStore() {
+    }
+
+    /**
+     *
+     * @param maxSize maximum size of inmemory store
+     * @param backingStore backing store containing the entries
+     */
+    public InMemoryWindowsStore(int maxSize, WindowsStore backingStore) {
+        this.maxSize = maxSize;
+        currentSize = new AtomicInteger();
+        this.backingStore = backingStore;
+    }
+
+    @Override
+    public Object get(String key) {
+        Object value = store.get(key);
+
+        if(value == null && backingStore != null) {
+            value = backingStore.get(key);
+        }
+
+        return value;
+    }
+
+    @Override
+    public Iterable<Object> get(List<String> keys) {
+        List<Object> values = new ArrayList<>();
+        for (String key : keys) {
+            values.add(get(key));
+        }
+        return values;
+    }
+
+    @Override
+    public Iterable<String> getAllKeys() {
+        if(backingStore != null) {
+            return backingStore.getAllKeys();
+        }
+
+        final Enumeration<String> storeEnumeration = store.keys();
+        final Iterator<String> resultIterator = new Iterator<String>() {
+            @Override
+            public boolean hasNext() {
+                return storeEnumeration.hasMoreElements();
+            }
+
+            @Override
+            public String next() {
+                return  storeEnumeration.nextElement();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove operation is not supported as it is immutable.");
+            }
+        };
+
+        return new Iterable<String>() {
+            @Override
+            public Iterator<String> iterator() {
+                return resultIterator;
+            }
+        };
+    }
+
+    @Override
+    public void put(String key, Object value) {
+        _put(key, value);
+
+        if(backingStore != null) {
+            backingStore.put(key, value);
+        }
+    }
+
+    private void _put(String key, Object value) {
+        if(!canAdd()) {
+            return;
+        }
+
+        store.put(key, value);
+        incrementCurrentSize();
+    }
+
+    private void incrementCurrentSize() {
+        if(backingStore != null) {
+            currentSize.incrementAndGet();
+        }
+    }
+
+    private boolean canAdd() {
+        return backingStore == null || currentSize.get() < maxSize;
+    }
+
+    @Override
+    public void putAll(Collection<Entry> entries) {
+        for (Entry entry : entries) {
+            _put(entry.key, entry.value);
+        }
+        if(backingStore != null) {
+            backingStore.putAll(entries);
+        }
+    }
+
+    @Override
+    public void remove(String key) {
+        _remove(key);
+
+        if(backingStore != null) {
+            backingStore.remove(key);
+        }
+    }
+
+    private void _remove(String key) {
+        Object oldValue = store.remove(key);
+
+        if(oldValue != null) {
+            decrementSize();
+            if(backingStore != null) {
+                backingStore.remove(key);
+            }
+        }
+
+    }
+
+    private void decrementSize() {
+        if(backingStore != null) {
+            currentSize.decrementAndGet();
+        }
+    }
+
+    @Override
+    public void removeAll(Collection<String> keys) {
+        for (String key : keys) {
+            _remove(key);
+        }
+
+        if(backingStore != null) {
+            backingStore.removeAll(keys);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        store.clear();
+
+        if(backingStore != null) {
+            backingStore.shutdown();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "InMemoryWindowsStore{" +
+                " store:size = " + store.size() +
+                " backingStore = " + backingStore +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
new file mode 100644
index 0000000..32027a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
+
+    private InMemoryWindowsStore inMemoryWindowsStore;
+
+    @Override
+    public WindowsStore create() {
+        if(inMemoryWindowsStore == null) {
+            inMemoryWindowsStore = new InMemoryWindowsStore();
+        }
+        return inMemoryWindowsStore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
new file mode 100644
index 0000000..885e508
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains
+ * tuples cache of {@code maxCachedTuplesSize} without accessing store for getting them.
+ *
+ */
+public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
+    private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+
+    private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
+
+    private final String windowTupleTaskId;
+    private final TridentTupleView.FreshOutputFactory freshOutputFactory;
+
+    private Long maxCachedTuplesSize;
+    private final Fields inputFields;
+    private AtomicLong currentCachedTuplesSize = new AtomicLong();
+
+    public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
+                                          BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
+        super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
+
+        this.maxCachedTuplesSize = maxTuplesCacheSize;
+        this.inputFields = inputFields;
+        freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
+        windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
+    }
+
+    protected void initialize() {
+
+        // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
+        String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId);
+        String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId);
+
+        List<String> attemptedTriggerKeys = new ArrayList<>();
+        List<String> triggerKeys = new ArrayList<>();
+
+        Iterable<String> allEntriesIterable = windowStore.getAllKeys();
+        for (String key : allEntriesIterable) {
+            if (key.startsWith(windowTupleTaskId)) {
+                int tupleIndexValue = lastPart(key);
+                String batchId = secondLastPart(key);
+                log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+                windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
+            } else if (key.startsWith(windowTriggerTaskId)) {
+                triggerKeys.add(key);
+                log.debug("Received trigger with key [{}]", key);
+            } else if(key.startsWith(windowTriggerInprocessId)) {
+                attemptedTriggerKeys.add(key);
+                log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+            }
+        }
+
+        // these triggers will be retried as part of batch retries
+        Set<Integer> triggersToBeIgnored = new HashSet<>();
+        Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys);
+        for (Object attemptedTrigger : attemptedTriggers) {
+            triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger);
+        }
+
+        // get trigger values only if they have more than zero
+        Iterable<Object> triggerObjects = windowStore.get(triggerKeys);
+        int i=0;
+        for (Object triggerObject : triggerObjects) {
+            int id = lastPart(triggerKeys.get(i++));
+            if(!triggersToBeIgnored.contains(id)) {
+                log.info("Adding pending trigger value [{}]", triggerObject);
+                pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
+            }
+        }
+
+    }
+
+    private int lastPart(String key) {
+        int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+        return Integer.parseInt(key.substring(lastSepIndex+1));
+    }
+
+    private String secondLastPart(String key) {
+        int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("key "+key+" does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+        String trimKey = key.substring(0, lastSepIndex);
+        int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR);
+        if (lastSepIndex < 0) {
+            throw new IllegalArgumentException("key "+key+" does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
+        }
+
+        return key.substring(secondLastSepIndex+1, lastSepIndex);
+    }
+
+    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
+        // check if they are already added then ignore these tuples. This batch is replayed.
+        if (activeBatches.contains(getBatchTxnId(batchId))) {
+            log.info("Ignoring already added tuples with batch: %s", batchId);
+            return;
+        }
+
+        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        List<WindowsStore.Entry> entries = new ArrayList<>();
+        for (int i = 0; i < tuples.size(); i++) {
+            String key = keyOf(batchId);
+            TridentTuple tridentTuple = tuples.get(i);
+            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
+        }
+
+        // tuples should be available in store before they are added to window manager
+        windowStore.putAll(entries);
+
+        for (int i = 0; i < tuples.size(); i++) {
+            String key = keyOf(batchId);
+            TridentTuple tridentTuple = tuples.get(i);
+            addToWindowManager(i, key, tridentTuple);
+        }
+
+    }
+
+    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
+        TridentTuple actualTuple = null;
+        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
+            actualTuple = tridentTuple;
+        }
+        currentCachedTuplesSize.incrementAndGet();
+        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
+    }
+
+    public String getBatchTxnId(Object batchId) {
+        if (!(batchId instanceof IBatchID)) {
+            throw new IllegalArgumentException("argument should be an IBatchId instance");
+        }
+        return ((IBatchID) batchId).getId().toString();
+    }
+
+    public String keyOf(Object batchId) {
+        return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
+    }
+
+    public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
+        List<TridentTuple> resultTuples = new ArrayList<>();
+        List<String> keys = new ArrayList<>();
+        for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) {
+            TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys);
+            if(tuple != null) {
+                resultTuples.add(tuple);
+            }
+        }
+
+        if(keys.size() > 0) {
+            Iterable<Object> storedTupleValues = windowStore.get(keys);
+            for (Object storedTupleValue : storedTupleValues) {
+                TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue);
+                resultTuples.add(tridentTuple);
+            }
+        }
+
+        return resultTuples;
+    }
+
+    public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) {
+        if (tridentBatchTuple.tridentTuple != null) {
+            return tridentBatchTuple.tridentTuple;
+        }
+        keys.add(tupleKey(tridentBatchTuple));
+        return null;
+    }
+
+    public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
+        if (maxCachedTuplesSize != null) {
+            currentCachedTuplesSize.addAndGet(-expiredTuples.size());
+        }
+
+        List<String> keys = new ArrayList<>();
+        for (TridentBatchTuple expiredTuple : expiredTuples) {
+            keys.add(tupleKey(expiredTuple));
+        }
+
+        windowStore.removeAll(keys);
+    }
+
+    private String tupleKey(TridentBatchTuple tridentBatchTuple) {
+        return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
+    }
+
+}


[6/9] storm git commit: STORM-676 Addressed review comments

Posted by sr...@apache.org.
STORM-676 Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b08d7eaf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b08d7eaf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b08d7eaf

Branch: refs/heads/1.x-branch
Commit: b08d7eaf7099e4da74010501a189818ec11b00bc
Parents: 3a96f20
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 19:03:55 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:47:03 2016 +0530

----------------------------------------------------------------------
 .../trident/windowing/HBaseWindowsStore.java    |  2 +-
 .../windowing/HBaseWindowsStoreFactory.java     |  1 +
 .../jvm/org/apache/storm/trident/Stream.java    | 22 +++++++++++---------
 .../windowing/WindowTridentProcessor.java       |  2 +-
 4 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index ff3fbf9..b300ed6 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -55,7 +55,7 @@ public class HBaseWindowsStore implements WindowsStore {
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
-    private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+    private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a49bc87..a47d5fb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -26,6 +26,7 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
 import java.util.Map;
 
 /**
+ * Factory to create {@link HBaseWindowsStore} instances.
  *
  */
 public class HBaseWindowsStoreFactory implements WindowsStoreFactory {

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 444b42a..23ac34a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -611,13 +611,13 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     /**
      * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
      *
-     * @param windowCount represents no of tuples in the window
+     * @param windowCount represents number of tuples in the window
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
                                       Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -626,7 +626,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
 
     /**
      * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
-     * and slides the window with {@code slideCount}.
+     * and slides the window after {@code slideCount}.
      *
      * @param windowCount represents tuples count of a window
      * @param slideCount the number of tuples after which the window slides
@@ -635,7 +635,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -643,7 +643,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     }
 
     /**
-     * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+     * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
      *
      * @param windowDuration represents tumbling window duration configuration
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
@@ -651,7 +651,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -659,7 +659,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     }
 
     /**
-     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
      * and completes a window at {@code windowDuration}
      *
      * @param windowDuration represents window duration configuration
@@ -669,7 +669,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
                                     WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -683,7 +683,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param inputFields input fields
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
-     * @return
+     *
+     * @return the new stream with this operation.
      */
     public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
         // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
@@ -700,7 +701,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param inputFields input fields
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
-     * @return
+     *
+     * @return the new stream with this operation.
      */
     public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
                          Aggregator aggregator, Fields functionFields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 8898b13..5125e41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -53,7 +53,7 @@ public class WindowTridentProcessor implements TridentProcessor {
     public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
 
     public static final String TRIGGER_FIELD_NAME = "_task_info";
-    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100L;
 
     private final String windowId;
     private final Fields inputFields;


[8/9] storm git commit: Merge branch 'STORM-676-1.x' of http://github.com/satishd/storm into STORM-676-1.x

Posted by sr...@apache.org.
Merge branch 'STORM-676-1.x' of http://github.com/satishd/storm into STORM-676-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b661bf81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b661bf81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b661bf81

Branch: refs/heads/1.x-branch
Commit: b661bf81ec084ee65c7793746e576dcde5157a3c
Parents: 1783d7a 8c263c7
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Mar 27 22:12:26 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Mar 27 22:12:26 2016 -0700

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  13 +-
 .../TridentHBaseWindowingStoreTopology.java     |  93 +++++++
 .../TridentWindowingInmemoryStoreTopology.java  |  98 +++++++
 .../trident/windowing/HBaseWindowsStore.java    | 275 +++++++++++++++++++
 .../windowing/HBaseWindowsStoreFactory.java     |  55 ++++
 storm-core/src/jvm/org/apache/storm/Config.java |   8 +
 .../jvm/org/apache/storm/trident/Stream.java    | 207 ++++++++++++--
 .../apache/storm/trident/TridentTopology.java   |   4 +
 .../storm/trident/fluent/UniqueIdGen.java       |  14 +-
 .../storm/trident/operation/builtin/Debug.java  |   4 +-
 .../windowing/AbstractTridentWindowManager.java | 238 ++++++++++++++++
 .../windowing/ITridentWindowManager.java        |  59 ++++
 .../windowing/InMemoryTridentWindowManager.java |  72 +++++
 .../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++
 .../windowing/InMemoryWindowsStoreFactory.java  |  46 ++++
 .../StoreBasedTridentWindowManager.java         | 217 +++++++++++++++
 .../trident/windowing/TridentBatchTuple.java    |  42 +++
 .../windowing/WindowTridentProcessor.java       | 265 ++++++++++++++++++
 .../storm/trident/windowing/WindowsState.java   |  52 ++++
 .../trident/windowing/WindowsStateFactory.java  |  40 +++
 .../trident/windowing/WindowsStateUpdater.java  |  81 ++++++
 .../storm/trident/windowing/WindowsStore.java   |  78 ++++++
 .../trident/windowing/WindowsStoreFactory.java  |  35 +++
 .../windowing/config/BaseWindowConfig.java      |  48 ++++
 .../windowing/config/SlidingCountWindow.java    |  43 +++
 .../windowing/config/SlidingDurationWindow.java |  44 +++
 .../windowing/config/TumblingCountWindow.java   |  43 +++
 .../config/TumblingDurationWindow.java          |  42 +++
 .../trident/windowing/config/WindowConfig.java  |  57 ++++
 .../windowing/strategy/BaseWindowStrategy.java  |  32 +++
 .../strategy/SlidingCountWindowStrategy.java    |  59 ++++
 .../strategy/SlidingDurationWindowStrategy.java |  60 ++++
 .../strategy/TumblingCountWindowStrategy.java   |  60 ++++
 .../TumblingDurationWindowStrategy.java         |  60 ++++
 .../windowing/strategy/WindowStrategy.java      |  45 +++
 .../apache/storm/windowing/TriggerHandler.java  |   2 +-
 .../storm/trident/TridentWindowingTest.java     | 105 +++++++
 37 files changed, 2858 insertions(+), 38 deletions(-)
----------------------------------------------------------------------



[3/9] storm git commit: STORM-676 addressed review comments

Posted by sr...@apache.org.
STORM-676 addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/532bb79b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/532bb79b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/532bb79b

Branch: refs/heads/1.x-branch
Commit: 532bb79b30fdfee04c24d7ed04f63b65c4c44862
Parents: 89c03b8
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Sun Mar 13 10:25:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:16 2016 +0530

----------------------------------------------------------------------
 .../trident/TridentHBaseWindowingStoreTopology.java  |  6 ++----
 .../TridentWindowingInmemoryStoreTopology.java       |  7 +++----
 .../hbase/trident/windowing/HBaseWindowsStore.java   |  6 +++---
 .../windowing/AbstractTridentWindowManager.java      | 14 +++++++-------
 .../windowing/InMemoryTridentWindowManager.java      | 10 +++++-----
 .../windowing/InMemoryWindowsStoreFactory.java       | 13 +++++++++++--
 .../windowing/StoreBasedTridentWindowManager.java    | 14 +++++++-------
 .../trident/windowing/WindowTridentProcessor.java    | 15 ++++++++++-----
 .../apache/storm/trident/windowing/WindowsState.java |  6 +++---
 .../storm/trident/windowing/WindowsStateUpdater.java |  8 ++++----
 10 files changed, 55 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 24cc8d9..0ebaa1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -23,7 +23,6 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
-import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFunction;
@@ -44,13 +43,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 /**
  *
  */
 public class TridentHBaseWindowingStoreTopology {
-    private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
 
     public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -84,7 +82,7 @@ public class TridentHBaseWindowingStoreTopology {
 
         @Override
         public void execute(TridentTuple tuple, TridentCollector collector) {
-            log.info("##########Echo.execute: " + tuple);
+            LOG.info("##########Echo.execute: " + tuple);
             collector.emit(tuple.getValues());
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5f0cb4f..a2455a0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,7 +25,6 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseAggregator;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -53,10 +52,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * Sample application of trident windowing which uses inmemory store for storing tuples in window.
  */
 public class TridentWindowingInmemoryStoreTopology {
-    private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
 
     public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -91,7 +90,7 @@ public class TridentWindowingInmemoryStoreTopology {
 
         @Override
         public void execute(TridentTuple tuple, TridentCollector collector) {
-            log.info("##########Echo.execute: " + tuple);
+            LOG.info("##########Echo.execute: " + tuple);
             collector.emit(tuple.getValues());
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 47879a4..ff3fbf9 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  *
  */
 public class HBaseWindowsStore implements WindowsStore {
-    private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
@@ -136,7 +136,7 @@ public class HBaseWindowsStore implements WindowsStore {
         for (int i=0; i<results.length; i++) {
             Result result = results[i];
             if(result.isEmpty()) {
-                log.error("Got empty result for key [{}]", keys.get(i));
+                LOG.error("Got empty result for key [{}]", keys.get(i));
                 throw new RuntimeException("Received empty result for key: "+keys.get(i));
             }
             Input input = new Input(result.getValue(family, qualifier));
@@ -267,7 +267,7 @@ public class HBaseWindowsStore implements WindowsStore {
             try {
                 htable.close();
             } catch (IOException e) {
-                log.error(e.getMessage(), e);
+                LOG.error(e.getMessage(), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index 2941e28..fd7a957 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  */
 public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
-    private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
 
     protected final WindowManager<T> windowManager;
     protected final Aggregator aggregator;
@@ -89,12 +89,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
     }
 
     private void preInitialize() {
-        log.debug("Getting current trigger count for this component/task");
+        LOG.debug("Getting current trigger count for this component/task");
         // get trigger count value from store
         Object result = windowStore.get(windowTriggerCountId);
         Integer currentCount = 0;
         if(result == null) {
-            log.info("No current trigger count in windows store.");
+            LOG.info("No current trigger count in windows store.");
         } else {
             currentCount = (Integer) result + 1;
         }
@@ -119,13 +119,13 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
         @Override
         public void onExpiry(List<T> expiredEvents) {
-            log.debug("onExpiry is invoked");
+            LOG.debug("onExpiry is invoked");
             onTuplesExpired(expiredEvents);
         }
 
         @Override
         public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
-            log.debug("onActivation is invoked with events size: {}", events.size());
+            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
             // trigger occurred, create an aggregation and keep them in store
             int currentTriggerId = triggerId.incrementAndGet();
             execAggregatorAndStoreResult(currentTriggerId, events);
@@ -230,10 +230,10 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
     public void shutdown() {
         try {
-            log.info("window manager [{}] is being shutdown", windowManager);
+            LOG.info("window manager [{}] is being shutdown", windowManager);
             windowManager.shutdown();
         } finally {
-            log.info("window store [{}] is being shutdown", windowStore);
+            LOG.info("window store [{}] is being shutdown", windowStore);
             windowStore.shutdown();
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index cbb30af..e47cc9a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -33,7 +33,7 @@ import java.util.List;
  * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
  */
 public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
-    private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
 
     public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
                                         BatchOutputCollector delegateCollector) {
@@ -42,7 +42,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
 
     @Override
     protected void initialize() {
-        log.debug("noop in initialize");
+        LOG.debug("noop in initialize");
     }
 
     @Override
@@ -52,17 +52,17 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
 
     @Override
     public void onTuplesExpired(List<TridentTuple> expiredTuples) {
-        log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+        LOG.debug("InMemoryTridentWindowManager.onTuplesExpired");
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
         // check if they are already added then ignore these tuples. This batch is replayed.
         if (activeBatches.contains(getBatchTxnId(batchId))) {
-            log.info("Ignoring already added tuples with batch: %s", batchId);
+            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
             return;
         }
 
-        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         for (TridentTuple tridentTuple : tuples) {
             windowManager.add(tridentTuple);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index 32027a9..cf65594 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,9 +18,18 @@
  */
 package org.apache.storm.trident.windowing;
 
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
 /**
- * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
- * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater},
+ * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in
+ * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}.
  *
  */
 public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 885e508..58b24a2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
  *
  */
 public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
-    private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
 
     private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
 
@@ -75,14 +75,14 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
             if (key.startsWith(windowTupleTaskId)) {
                 int tupleIndexValue = lastPart(key);
                 String batchId = secondLastPart(key);
-                log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+                LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
                 windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
             } else if (key.startsWith(windowTriggerTaskId)) {
                 triggerKeys.add(key);
-                log.debug("Received trigger with key [{}]", key);
+                LOG.debug("Received trigger with key [{}]", key);
             } else if(key.startsWith(windowTriggerInprocessId)) {
                 attemptedTriggerKeys.add(key);
-                log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+                LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
             }
         }
 
@@ -99,7 +99,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
         for (Object triggerObject : triggerObjects) {
             int id = lastPart(triggerKeys.get(i++));
             if(!triggersToBeIgnored.contains(id)) {
-                log.info("Adding pending trigger value [{}]", triggerObject);
+                LOG.info("Adding pending trigger value [{}]", triggerObject);
                 pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
             }
         }
@@ -131,11 +131,11 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
         // check if they are already added then ignore these tuples. This batch is replayed.
         if (activeBatches.contains(getBatchTxnId(batchId))) {
-            log.info("Ignoring already added tuples with batch: %s", batchId);
+            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
             return;
         }
 
-        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         List<WindowsStore.Entry> entries = new ArrayList<>();
         for (int i = 0; i < tuples.size(); i++) {
             String key = keyOf(batchId);

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index c2d9362..8898b13 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -46,7 +46,7 @@ import java.util.Queue;
  *
  */
 public class WindowTridentProcessor implements TridentProcessor {
-    private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class);
 
     public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
     public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
@@ -127,8 +127,13 @@ public class WindowTridentProcessor implements TridentProcessor {
 
     @Override
     public void cleanup() {
-        log.info("shutting down window manager");
-        tridentWindowManager.shutdown();
+        LOG.info("shutting down window manager");
+        try {
+            tridentWindowManager.shutdown();
+        } catch (Exception ex) {
+            LOG.error("Error occurred while cleaning up window processor", ex);
+            throw ex;
+        }
     }
 
     @Override
@@ -150,7 +155,7 @@ public class WindowTridentProcessor implements TridentProcessor {
         Object batchId = processorContext.batchId;
         Object batchTxnId = getBatchTxnId(batchId);
 
-        log.debug("Received finishBatch of : {} ", batchId);
+        LOG.debug("Received finishBatch of : [{}] ", batchId);
         // get all the tuples in a batch and add it to trident-window-manager
         List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
         tridentWindowManager.addTuplesBatch(batchId, tuples);
@@ -171,7 +176,7 @@ public class WindowTridentProcessor implements TridentProcessor {
         if(triggerValues == null) {
             pendingTriggerIds = new ArrayList<>();
             Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
-            log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
             try {
                 Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                 List<Object> values = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
index 378d24f..faf73d6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class WindowsState implements State {
-    private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowsState.class);
 
     private Long currentTxId;
 
@@ -38,12 +38,12 @@ public class WindowsState implements State {
     @Override
     public void beginCommit(Long txId) {
         currentTxId = txId;
-        log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+        LOG.debug(" WindowsState.beginCommit:: [{}] ", txId);
     }
 
     @Override
     public void commit(Long txId) {
-        log.debug("WindowsState.commit :: [{}]", txId);
+        LOG.debug("WindowsState.commit :: [{}]", txId);
     }
 
     public Long getCurrentTxId() {

http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 45ac885..6664b41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -36,7 +36,7 @@ import java.util.Map;
  */
 public class WindowsStateUpdater implements StateUpdater<WindowsState> {
 
-    private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowsStateUpdater.class);
 
     private final WindowsStoreFactory windowStoreFactory;
     private WindowsStore windowsStore;
@@ -48,7 +48,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
     @Override
     public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
         Long currentTxId = state.getCurrentTxId();
-        log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+        LOG.debug("Removing triggers using WindowStateUpdater, txnId: [{}] ", currentTxId);
         for (TridentTuple tuple : tuples) {
             try {
                 Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
@@ -58,11 +58,11 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
                 WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
                 String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
 
-                log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+                LOG.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
 
                 windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
             } catch (Exception ex) {
-                log.warn(ex.getMessage());
+                LOG.warn(ex.getMessage());
                 collector.reportError(ex);
                 throw new FailedException(ex);
             }


[4/9] storm git commit: STORM-676 Addressed review comments on API aligning with core window API

Posted by sr...@apache.org.
STORM-676 Addressed review comments on API aligning with core window API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd02bcfd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd02bcfd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd02bcfd

Branch: refs/heads/1.x-branch
Commit: dd02bcfdcfe6f92fef055a933722a7b485c8e613
Parents: 532bb79
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Tue Mar 15 14:18:30 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:26 2016 +0530

----------------------------------------------------------------------
 .../jvm/org/apache/storm/trident/Stream.java    | 22 ++++----------------
 1 file changed, 4 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dd02bcfd/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index b3a4446..47b087a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -611,20 +611,6 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     /**
      * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
      *
-     * @param windowCount represents window tuples count
-     * @param inputFields projected fields for aggregator
-     * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
-     * @param functionFields fields of values to emit with aggregation.
-     *
-     * @return
-     */
-    public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) {
-        return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields);
-    }
-
-    /**
-     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
-     *
      * @param windowCount represents no of tuples in the window
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
      * @param inputFields projected fields for aggregator
@@ -633,7 +619,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
+    public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
                                       Fields inputFields, Aggregator aggregator, Fields functionFields) {
         return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields);
     }
@@ -651,7 +637,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
+    public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
         return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields);
     }
@@ -667,7 +653,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
+    public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
         return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields);
     }
@@ -685,7 +671,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
                                     WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
         return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
     }


[5/9] storm git commit: STORM-676 Addressed review comments from Arun

Posted by sr...@apache.org.
STORM-676 Addressed review comments from Arun


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a96f20f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a96f20f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a96f20f

Branch: refs/heads/1.x-branch
Commit: 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc
Parents: dd02bcf
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 11:35:37 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:46:49 2016 +0530

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  | 31 ------------
 .../TridentHBaseWindowingStoreTopology.java     | 49 ++++--------------
 .../TridentWindowingInmemoryStoreTopology.java  | 53 ++++----------------
 .../windowing/HBaseWindowsStoreFactory.java     |  3 ++
 .../jvm/org/apache/storm/trident/Stream.java    | 12 ++---
 .../windowing/AbstractTridentWindowManager.java |  1 -
 .../windowing/InMemoryTridentWindowManager.java |  6 ---
 .../StoreBasedTridentWindowManager.java         |  6 ---
 8 files changed, 28 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index e702a5d..6053595 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -35,7 +35,6 @@
     <!-- see comment below... This fixes an annoyance with intellij -->
     <provided.scope>provided</provided.scope>
     <hbase.version>0.98.4-hadoop2</hbase.version>
-    <hbase.version>1.1.2</hbase.version>
   </properties>
 
   <profiles>
@@ -175,36 +174,6 @@
       <artifactId>storm-redis</artifactId>
       <version>${project.version}</version>
     </dependency>
-      <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-server</artifactId>
-          <version>${hbase.version}</version>
-          <exclusions>
-              <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>slf4j-log4j12</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>org.apache.zookeeper</groupId>
-                  <artifactId>zookeeper</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-client</artifactId>
-          <version>${hbase.version}</version>
-          <exclusions>
-              <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>slf4j-log4j12</artifactId>
-              </exclusion>
-              <exclusion>
-                  <groupId>org.apache.zookeeper</groupId>
-                  <artifactId>zookeeper</artifactId>
-              </exclusion>
-          </exclusions>
-      </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 0ebaa1f..ba18f7c 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -25,13 +25,10 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.testing.CountAsAggregator;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
 import org.apache.storm.trident.windowing.config.TumblingCountWindow;
@@ -42,9 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.Map;
 
 /**
+ * Sample application of trident windowing which uses {@link HBaseWindowsStoreFactory}'s store for storing tuples in window.
  *
  */
 public class TridentHBaseWindowingStoreTopology {
@@ -61,46 +58,20 @@ public class TridentHBaseWindowingStoreTopology {
         Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
                 new Split(), new Fields("word"))
                 .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
-//                .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
-                .each(new Fields("count"), new Debug())
-                .each(new Fields("count"), new Echo(), new Fields("ct"));
+                .peek(new Consumer() {
+                    @Override
+                    public void accept(TridentTuple input) {
+                        LOG.info("Received tuple: [{}]", input);
+                    }
+                });
 
         return topology.build();
     }
 
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    static class Echo implements Function {
-
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            LOG.info("##########Echo.execute: " + tuple);
-            collector.emit(tuple.getValues());
-        }
-
-        @Override
-        public void prepare(Map conf, TridentOperationContext context) {
-
-        }
-
-        @Override
-        public void cleanup() {
-
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2);
+        conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
 
         // window-state table should already be created with cf:tuples column
         HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index a2455a0..5aec01d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,21 +25,14 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.Function;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.TridentOperationContext;
-import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.testing.CountAsAggregator;
 import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
-import org.apache.storm.trident.windowing.config.SlidingCountWindow;
-import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
-import org.apache.storm.trident.windowing.config.TumblingCountWindow;
-import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.config.*;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
@@ -48,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -68,43 +60,16 @@ public class TridentWindowingInmemoryStoreTopology {
         Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
                 new Split(), new Fields("word"))
                 .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count"))
-//                .aggregate(new CountAsAggregator(), new Fields("count"))
-                .each(new Fields("count"), new Debug())
-                .each(new Fields("count"), new Echo(), new Fields("ct"))
-                .each(new Fields("ct"), new Debug());
+                .peek(new Consumer() {
+                    @Override
+                    public void accept(TridentTuple input) {
+                        LOG.info("Received tuple: [{}]", input);
+                    }
+                });
 
         return topology.build();
     }
 
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    static class Echo implements Function {
-
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            LOG.info("##########Echo.execute: " + tuple);
-            collector.emit(tuple.getValues());
-        }
-
-        @Override
-        public void prepare(Map conf, TridentOperationContext context) {
-
-        }
-
-        @Override
-        public void cleanup() {
-
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config conf = new Config();
         WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index 56fad58..a49bc87 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -25,6 +25,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
 
 import java.util.Map;
 
+/**
+ *
+ */
 public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
     private final Map<String, Object> config;
     private final String tableName;

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 47b087a..444b42a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -629,7 +629,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * and slides the window with {@code slideCount}.
      *
      * @param windowCount represents tuples count of a window
-     * @param slideCount represents sliding count window
+     * @param slideCount the number of tuples after which the window slides
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
@@ -663,7 +663,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * and completes a window at {@code windowDuration}
      *
      * @param windowDuration represents window duration configuration
-     * @param slideDuration represents sliding duration  configuration
+     * @param slidingInterval the time duration after which the window slides
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
@@ -671,9 +671,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      *
      * @return
      */
-    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration,
+    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
                                     WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
-        return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields);
+        return window(SlidingDurationWindow.of(windowDuration, slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index fd7a957..aac18d3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -55,7 +55,6 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
     protected final String windowTaskId;
     protected final WindowsStore windowStore;
 
-    protected final Set<String> activeBatches = new HashSet<>();
     protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>();
     protected final AtomicInteger triggerId = new AtomicInteger();
     private final String windowTriggerCountId;

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index e47cc9a..69eb39e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -56,12 +56,6 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
-        // check if they are already added then ignore these tuples. This batch is replayed.
-        if (activeBatches.contains(getBatchTxnId(batchId))) {
-            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
-            return;
-        }
-
         LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         for (TridentTuple tridentTuple : tuples) {
             windowManager.add(tridentTuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/3a96f20f/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 58b24a2..87c1a0f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -129,12 +129,6 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
-        // check if they are already added then ignore these tuples. This batch is replayed.
-        if (activeBatches.contains(getBatchTxnId(batchId))) {
-            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
-            return;
-        }
-
         LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         List<WindowsStore.Entry> entries = new ArrayList<>();
         for (int i = 0; i < tuples.size(); i++) {