You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/31 22:01:55 UTC

[2/4] samza git commit: Close iterators to time-series store on deletes

Close iterators to time-series store on deletes

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #787 from vjagadish1989/website-reorg29


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a000206
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a000206
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a000206

Branch: refs/heads/1.0.0
Commit: 7a000206f52617dec92b67efb91b59a2ec12c795
Parents: a664fb5
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 30 18:48:40 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 31 14:30:14 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/operators/impl/WindowOperatorImpl.java | 4 ++--
 .../samza/operators/impl/store/TimeSeriesStoreImpl.java     | 9 ++++++---
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7a000206/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index c09c5f8..0241d9e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -379,15 +379,15 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
    * @return a list of all elements returned by the iterator
    */
   static <V>  List<V> toList(ClosableIterator<V> iterator) {
+    Preconditions.checkNotNull(iterator);
+
     List<V> values = new ArrayList<>();
     try {
       while (iterator.hasNext()) {
         values.add(iterator.next());
       }
     } finally {
-      if (iterator != null) {
         iterator.close();
-      }
     }
     return Collections.unmodifiableList(values);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/7a000206/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index 10a5967..b8cd82f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -154,10 +154,13 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
     List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>();
 
     KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
-    while (range.hasNext()) {
-      keysToDelete.add(range.next().getKey());
+    try {
+      while (range.hasNext()) {
+        keysToDelete.add(range.next().getKey());
+      }
+    } finally {
+      range.close();
     }
-
     kvStore.deleteAll(keysToDelete);
   }