You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/23 23:09:04 UTC
samza git commit: SAMZA-1065: Change the commit order to support at
least once processing when using local state store for deduping.
Repository: samza
Updated Branches:
refs/heads/master 52ff04197 -> 965a645d8
SAMZA-1065: Change the commit order to support at least once processing when using local state store for deduping.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>, Jagadish <jagadish1989@gmail,com>
Closes #35 from prateekm/commit-order
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/965a645d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/965a645d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/965a645d
Branch: refs/heads/master
Commit: 965a645d84f7118cb68f2994233ed61f67dbc690
Parents: 52ff041
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Dec 23 15:08:55 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Dec 23 15:08:55 2016 -0800
----------------------------------------------------------------------
.../org/apache/samza/container/TaskInstance.scala | 14 +++++++-------
.../org/apache/samza/storage/kv/LoggedStore.scala | 8 ++++----
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index b068856..26a8f5f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -191,19 +191,19 @@ class TaskInstance[T](
}
def commit {
- trace("Flushing state stores for taskName: %s" format taskName)
-
metrics.commits.inc
- if (storageManager != null) {
- storageManager.flush
- }
-
trace("Flushing producers for taskName: %s" format taskName)
collector.flush
- trace("Committing offset manager for taskName: %s" format taskName)
+ trace("Flushing state stores for taskName: %s" format taskName)
+
+ if (storageManager != null) {
+ storageManager.flush
+ }
+
+ trace("Checkpointing offsets for taskName: %s" format taskName)
offsetManager.checkpoint(taskName)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index dc5cbcd..33ff41b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -61,8 +61,8 @@ class LoggedStore[K, V](
*/
def put(key: K, value: V) {
metrics.puts.inc
- store.put(key, value)
collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, value))
+ store.put(key, value)
}
/**
@@ -70,12 +70,12 @@ class LoggedStore[K, V](
*/
def putAll(entries: java.util.List[Entry[K, V]]) {
metrics.puts.inc(entries.size)
- store.putAll(entries)
val iter = entries.iterator
while (iter.hasNext) {
val curr = iter.next
collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, curr.getKey, curr.getValue))
}
+ store.putAll(entries)
}
/**
@@ -83,8 +83,8 @@ class LoggedStore[K, V](
*/
def delete(key: K) {
metrics.deletes.inc
- store.delete(key)
collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null))
+ store.delete(key)
}
/**
@@ -92,11 +92,11 @@ class LoggedStore[K, V](
*/
def deleteAll(keys: java.util.List[K]) = {
metrics.deletes.inc(keys.size)
- store.deleteAll(keys)
val keysIterator = keys.iterator
while (keysIterator.hasNext) {
collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, keysIterator.next, null))
}
+ store.deleteAll(keys)
}
def flush {