You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/28 08:05:28 UTC
kafka git commit: HOTFIX: fix threading issue in MeteredKeyValueStore
Repository: kafka
Updated Branches:
refs/heads/trunk 6bd730264 -> 4059fa576
HOTFIX: fix threading issue in MeteredKeyValueStore
`MeteredKeyValueStore` wasn't thread safe. Interleaving operations could modify the state, i.e, the `key` and/or `value` which could result in incorrect behaviour.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3588 from dguy/hotfix-metered-kv-store
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4059fa57
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4059fa57
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4059fa57
Branch: refs/heads/trunk
Commit: 4059fa5763252c6869b6b7c682d7b5f9d4dc3a87
Parents: 6bd7302
Author: Damian Guy <da...@gmail.com>
Authored: Fri Jul 28 09:05:23 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Jul 28 09:05:23 2017 +0100
----------------------------------------------------------------------
.../state/internals/MeteredKeyValueStore.java | 174 +++++++++++--------
1 file changed, 99 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4059fa57/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2f280fc..fd80b1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,7 +39,6 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
private final KeyValueStore<K, V> inner;
private final String metricScope;
protected final Time time;
-
private Sensor putTime;
private Sensor putIfAbsentTime;
private Sensor getTime;
@@ -48,57 +47,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
private Sensor allTime;
private Sensor rangeTime;
private Sensor flushTime;
- private Sensor restoreTime;
- private StreamsMetricsImpl metrics;
-
-
- private K key;
- private V value;
- private Runnable getDelegate = new Runnable() {
- @Override
- public void run() {
- value = inner.get(key);
- }
- };
- private Runnable putDelegate = new Runnable() {
- @Override
- public void run() {
- inner.put(key, value);
- }
- };
- private Runnable putIfAbsentDelegate = new Runnable() {
- @Override
- public void run() {
- value = inner.putIfAbsent(key, value);
- }
- };
- private List<KeyValue<K, V>> entries;
- private Runnable putAllDelegate = new Runnable() {
- @Override
- public void run() {
- inner.putAll(entries);
- }
- };
- private Runnable deleteDelegate = new Runnable() {
- @Override
- public void run() {
- value = inner.delete(key);
- }
- };
- private Runnable flushDelegate = new Runnable() {
- @Override
- public void run() {
- inner.flush();
- }
- };
+ private StreamsMetrics metrics;
private ProcessorContext context;
private StateStore root;
- private Runnable initDelegate = new Runnable() {
- @Override
- public void run() {
- inner.init(context, root);
- }
- };
// always wrap the store with the metered store
public MeteredKeyValueStore(final KeyValueStore<K, V> inner,
@@ -115,7 +66,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
final String name = name();
this.context = context;
this.root = root;
- this.metrics = (StreamsMetricsImpl) context.metrics();
+ this.metrics = context.metrics();
this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG);
this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
@@ -124,10 +75,21 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG);
this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG);
this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
- this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
+ final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
// register and possibly restore the state from the logs
- metrics.measureLatencyNs(time, initDelegate, this.restoreTime);
+ if (restoreTime.shouldRecord()) {
+ measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
+ return null;
+ }
+ }, restoreTime);
+ } else {
+ inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
+ }
+
}
@Override
@@ -135,39 +97,81 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
return inner.approximateNumEntries();
}
+ interface Action<V> {
+ V execute();
+ }
+
@Override
- public V get(K key) {
- this.key = key;
- metrics.measureLatencyNs(time, getDelegate, this.getTime);
- return value;
+ public V get(final K key) {
+ if (getTime.shouldRecord()) {
+ return measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ return inner.get(key);
+ }
+ }, getTime);
+ } else {
+ return inner.get(key);
+ }
}
@Override
- public void put(K key, V value) {
- this.key = key;
- this.value = value;
- metrics.measureLatencyNs(time, putDelegate, this.putTime);
+ public void put(final K key, final V value) {
+ if (putTime.shouldRecord()) {
+ measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ inner.put(key, value);
+ return null;
+ }
+ }, putTime);
+ } else {
+ inner.put(key, value);
+ }
}
@Override
- public V putIfAbsent(K key, V value) {
- this.key = key;
- this.value = value;
- metrics.measureLatencyNs(time, putIfAbsentDelegate, this.putIfAbsentTime);
- return this.value;
+ public V putIfAbsent(final K key, final V value) {
+ if (putIfAbsentTime.shouldRecord()) {
+ return measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ return inner.putIfAbsent(key, value);
+ }
+ }, putIfAbsentTime);
+ } else {
+ return inner.putIfAbsent(key, value);
+ }
+
}
@Override
- public void putAll(List<KeyValue<K, V>> entries) {
- this.entries = entries;
- metrics.measureLatencyNs(time, putAllDelegate, this.putAllTime);
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ if (putAllTime.shouldRecord()) {
+ measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ inner.putAll(entries);
+ return null;
+ }
+ }, putAllTime);
+ } else {
+ inner.putAll(entries);
+ }
}
@Override
- public V delete(K key) {
- this.key = key;
- metrics.measureLatencyNs(time, deleteDelegate, this.deleteTime);
- return value;
+ public V delete(final K key) {
+ if (deleteTime.shouldRecord()) {
+ return measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ return inner.delete(key);
+ }
+ }, deleteTime);
+ } else {
+ return inner.delete(key);
+ }
}
@Override
@@ -182,7 +186,27 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
@Override
public void flush() {
- metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
+ if (flushTime.shouldRecord()) {
+ measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ inner.flush();
+ return null;
+ }
+ }, flushTime);
+ } else {
+ inner.flush();
+ }
+
+ }
+
+ private V measureLatency(final Action<V> action, final Sensor sensor) {
+ final long startNs = time.nanoseconds();
+ try {
+ return action.execute();
+ } finally {
+ metrics.recordLatency(sensor, startNs, time.nanoseconds());
+ }
}
private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {