You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/31 01:48:43 UTC

samza git commit: Close iterators to time-series store on deletes

Repository: samza
Updated Branches:
  refs/heads/master d17134e0d -> 7836bf08c


Close iterators to time-series store on deletes

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #787 from vjagadish1989/website-reorg29


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7836bf08
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7836bf08
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7836bf08

Branch: refs/heads/master
Commit: 7836bf08c54f48cdac50f57e89c4eff8ec1925ea
Parents: d17134e
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 30 18:48:40 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Oct 30 18:48:40 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/operators/impl/WindowOperatorImpl.java | 4 ++--
 .../samza/operators/impl/store/TimeSeriesStoreImpl.java     | 9 ++++++---
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7836bf08/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index c09c5f8..0241d9e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -379,15 +379,15 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
    * @return a list of all elements returned by the iterator
    */
   static <V>  List<V> toList(ClosableIterator<V> iterator) {
+    Preconditions.checkNotNull(iterator);
+
     List<V> values = new ArrayList<>();
     try {
       while (iterator.hasNext()) {
         values.add(iterator.next());
       }
     } finally {
-      if (iterator != null) {
         iterator.close();
-      }
     }
     return Collections.unmodifiableList(values);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/7836bf08/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index 10a5967..b8cd82f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -154,10 +154,13 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
     List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>();
 
     KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
-    while (range.hasNext()) {
-      keysToDelete.add(range.next().getKey());
+    try {
+      while (range.hasNext()) {
+        keysToDelete.add(range.next().getKey());
+      }
+    } finally {
+      range.close();
     }
-
     kvStore.deleteAll(keysToDelete);
   }