You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/31 22:01:55 UTC
[2/4] samza git commit: Close iterators to time-series store on
deletes
Close iterators to time-series store on deletes
Author: Jagadish <jv...@linkedin.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #787 from vjagadish1989/website-reorg29
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a000206
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a000206
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a000206
Branch: refs/heads/1.0.0
Commit: 7a000206f52617dec92b67efb91b59a2ec12c795
Parents: a664fb5
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 30 18:48:40 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Oct 31 14:30:14 2018 -0700
----------------------------------------------------------------------
.../org/apache/samza/operators/impl/WindowOperatorImpl.java | 4 ++--
.../samza/operators/impl/store/TimeSeriesStoreImpl.java | 9 ++++++---
2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7a000206/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index c09c5f8..0241d9e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -379,15 +379,15 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
* @return a list of all elements returned by the iterator
*/
static <V> List<V> toList(ClosableIterator<V> iterator) {
+ Preconditions.checkNotNull(iterator);
+
List<V> values = new ArrayList<>();
try {
while (iterator.hasNext()) {
values.add(iterator.next());
}
} finally {
- if (iterator != null) {
iterator.close();
- }
}
return Collections.unmodifiableList(values);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/7a000206/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index 10a5967..b8cd82f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -154,10 +154,13 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> {
List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>();
KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey);
- while (range.hasNext()) {
- keysToDelete.add(range.next().getKey());
+ try {
+ while (range.hasNext()) {
+ keysToDelete.add(range.next().getKey());
+ }
+ } finally {
+ range.close();
}
-
kvStore.deleteAll(keysToDelete);
}