You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/23 23:09:04 UTC

samza git commit: SAMZA-1065: Change the commit order to support at least once processing when using local state store for deduping.

Repository: samza
Updated Branches:
  refs/heads/master 52ff04197 -> 965a645d8


SAMZA-1065: Change the commit order to support at least once processing when using local state store for deduping.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Yi Pan <ni...@gmail.com>, Jagadish <jagadish1989@gmail,com>

Closes #35 from prateekm/commit-order


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/965a645d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/965a645d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/965a645d

Branch: refs/heads/master
Commit: 965a645d84f7118cb68f2994233ed61f67dbc690
Parents: 52ff041
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Dec 23 15:08:55 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Dec 23 15:08:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/samza/container/TaskInstance.scala     | 14 +++++++-------
 .../org/apache/samza/storage/kv/LoggedStore.scala     |  8 ++++----
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index b068856..26a8f5f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -191,19 +191,19 @@ class TaskInstance[T](
   }
 
   def commit {
-    trace("Flushing state stores for taskName: %s" format taskName)
-
     metrics.commits.inc
 
-    if (storageManager != null) {
-      storageManager.flush
-    }
-
     trace("Flushing producers for taskName: %s" format taskName)
 
     collector.flush
 
-    trace("Committing offset manager for taskName: %s" format taskName)
+    trace("Flushing state stores for taskName: %s" format taskName)
+
+    if (storageManager != null) {
+      storageManager.flush
+    }
+
+    trace("Checkpointing offsets for taskName: %s" format taskName)
 
     offsetManager.checkpoint(taskName)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/965a645d/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index dc5cbcd..33ff41b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -61,8 +61,8 @@ class LoggedStore[K, V](
    */
   def put(key: K, value: V) {
     metrics.puts.inc
-    store.put(key, value)
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, value))
+    store.put(key, value)
   }
 
   /**
@@ -70,12 +70,12 @@ class LoggedStore[K, V](
    */
   def putAll(entries: java.util.List[Entry[K, V]]) {
     metrics.puts.inc(entries.size)
-    store.putAll(entries)
     val iter = entries.iterator
     while (iter.hasNext) {
       val curr = iter.next
       collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, curr.getKey, curr.getValue))
     }
+    store.putAll(entries)
   }
 
   /**
@@ -83,8 +83,8 @@ class LoggedStore[K, V](
    */
   def delete(key: K) {
     metrics.deletes.inc
-    store.delete(key)
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null))
+    store.delete(key)
   }
 
   /**
@@ -92,11 +92,11 @@ class LoggedStore[K, V](
    */
   def deleteAll(keys: java.util.List[K]) = {
     metrics.deletes.inc(keys.size)
-    store.deleteAll(keys)
     val keysIterator = keys.iterator
     while (keysIterator.hasNext) {
       collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, keysIterator.next, null))
     }
+    store.deleteAll(keys)
   }
 
   def flush {