You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/28 08:05:28 UTC

kafka git commit: HOTFIX: fix threading issue in MeteredKeyValueStore

Repository: kafka
Updated Branches:
  refs/heads/trunk 6bd730264 -> 4059fa576


HOTFIX: fix threading issue in MeteredKeyValueStore

`MeteredKeyValueStore` wasn't thread safe. Interleaving operations could modify the state, i.e, the `key` and/or `value` which could result in incorrect behaviour.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3588 from dguy/hotfix-metered-kv-store


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4059fa57
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4059fa57
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4059fa57

Branch: refs/heads/trunk
Commit: 4059fa5763252c6869b6b7c682d7b5f9d4dc3a87
Parents: 6bd7302
Author: Damian Guy <da...@gmail.com>
Authored: Fri Jul 28 09:05:23 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jul 28 09:05:23 2017 +0100

----------------------------------------------------------------------
 .../state/internals/MeteredKeyValueStore.java   | 174 +++++++++++--------
 1 file changed, 99 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4059fa57/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2f280fc..fd80b1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -39,7 +39,6 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     private final KeyValueStore<K, V> inner;
     private final String metricScope;
     protected final Time time;
-
     private Sensor putTime;
     private Sensor putIfAbsentTime;
     private Sensor getTime;
@@ -48,57 +47,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     private Sensor allTime;
     private Sensor rangeTime;
     private Sensor flushTime;
-    private Sensor restoreTime;
-    private StreamsMetricsImpl metrics;
-
-
-    private K key;
-    private V value;
-    private Runnable getDelegate = new Runnable() {
-        @Override
-        public void run() {
-            value = inner.get(key);
-        }
-    };
-    private Runnable putDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.put(key, value);
-        }
-    };
-    private Runnable putIfAbsentDelegate = new Runnable() {
-        @Override
-        public void run() {
-            value = inner.putIfAbsent(key, value);
-        }
-    };
-    private List<KeyValue<K, V>> entries;
-    private Runnable putAllDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.putAll(entries);
-        }
-    };
-    private Runnable deleteDelegate = new Runnable() {
-        @Override
-        public void run() {
-            value = inner.delete(key);
-        }
-    };
-    private Runnable flushDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.flush();
-        }
-    };
+    private StreamsMetrics metrics;
     private ProcessorContext context;
     private StateStore root;
-    private Runnable initDelegate = new Runnable() {
-        @Override
-        public void run() {
-            inner.init(context, root);
-        }
-    };
 
     // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner,
@@ -115,7 +66,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
         final String name = name();
         this.context = context;
         this.root = root;
-        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
         this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG);
         this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
@@ -124,10 +75,21 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
         this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG);
         this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG);
         this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
-        this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
 
         // register and possibly restore the state from the logs
-        metrics.measureLatencyNs(time, initDelegate, this.restoreTime);
+        if (restoreTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
+                    return null;
+                }
+            }, restoreTime);
+        } else {
+            inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
+        }
+
     }
 
     @Override
@@ -135,39 +97,81 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
         return inner.approximateNumEntries();
     }
 
+    interface Action<V> {
+        V execute();
+    }
+
     @Override
-    public V get(K key) {
-        this.key = key;
-        metrics.measureLatencyNs(time, getDelegate, this.getTime);
-        return value;
+    public V get(final K key) {
+        if (getTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return inner.get(key);
+                }
+            }, getTime);
+        } else {
+            return inner.get(key);
+        }
     }
 
     @Override
-    public void put(K key, V value) {
-        this.key = key;
-        this.value = value;
-        metrics.measureLatencyNs(time, putDelegate, this.putTime);
+    public void put(final K key, final V value) {
+        if (putTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.put(key, value);
+                    return null;
+                }
+            }, putTime);
+        } else {
+            inner.put(key, value);
+        }
     }
 
     @Override
-    public V putIfAbsent(K key, V value) {
-        this.key = key;
-        this.value = value;
-        metrics.measureLatencyNs(time, putIfAbsentDelegate, this.putIfAbsentTime);
-        return this.value;
+    public V putIfAbsent(final K key, final V value) {
+        if (putIfAbsentTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return inner.putIfAbsent(key, value);
+                }
+            }, putIfAbsentTime);
+        } else {
+            return inner.putIfAbsent(key, value);
+        }
+
     }
 
     @Override
-    public void putAll(List<KeyValue<K, V>> entries) {
-        this.entries = entries;
-        metrics.measureLatencyNs(time, putAllDelegate, this.putAllTime);
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        if (putAllTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.putAll(entries);
+                    return null;
+                }
+            }, putAllTime);
+        } else {
+            inner.putAll(entries);
+        }
     }
 
     @Override
-    public V delete(K key) {
-        this.key = key;
-        metrics.measureLatencyNs(time, deleteDelegate, this.deleteTime);
-        return value;
+    public V delete(final K key) {
+        if (deleteTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return inner.delete(key);
+                }
+            }, deleteTime);
+        } else {
+            return inner.delete(key);
+        }
     }
 
     @Override
@@ -182,7 +186,27 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
 
     @Override
     public void flush() {
-        metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
+        if (flushTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.flush();
+                    return null;
+                }
+            }, flushTime);
+        } else {
+            inner.flush();
+        }
+
+    }
+
+    private V measureLatency(final Action<V> action, final Sensor sensor) {
+        final long startNs = time.nanoseconds();
+        try {
+            return action.execute();
+        } finally {
+            metrics.recordLatency(sensor, startNs, time.nanoseconds());
+        }
     }
 
     private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {