You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/07 03:42:54 UTC
[kafka] branch trunk updated: KAFKA-6538: Changes to enhance
ByteStore exceptions thrown from RocksDBStore with more human readable info
(#5103)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1509679 KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)
1509679 is described below
commit 150967994a69b2e26f5f68efb2a54a8522ae641d
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Thu Jun 7 09:12:41 2018 +0530
KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../state/internals/InnerMeteredKeyValueStore.java | 72 +++++++++++++---------
.../state/internals/MeteredSessionStore.java | 7 +++
.../state/internals/MeteredWindowStore.java | 4 ++
.../streams/state/internals/RocksDBStore.java | 9 ++-
4 files changed, 61 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 14464e0..200b2d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -173,30 +174,40 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
@Override
public V get(final K key) {
- if (getTime.shouldRecord()) {
- return measureLatency(new Action<V>() {
- @Override
- public V execute() {
- return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
- }
- }, getTime);
- } else {
- return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+ try {
+ if (getTime.shouldRecord()) {
+ return measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+ }
+ }, getTime);
+ } else {
+ return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+ }
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key);
+ throw new ProcessorStateException(message, e);
}
}
@Override
public void put(final K key, final V value) {
- if (putTime.shouldRecord()) {
- measureLatency(new Action<V>() {
- @Override
- public V execute() {
- inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
- return null;
- }
- }, putTime);
- } else {
- inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+ try {
+ if (putTime.shouldRecord()) {
+ measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+ return null;
+ }
+ }, putTime);
+ } else {
+ inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+ }
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key, value);
+ throw new ProcessorStateException(message, e);
}
}
@@ -232,15 +243,20 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
@Override
public V delete(final K key) {
- if (deleteTime.shouldRecord()) {
- return measureLatency(new Action<V>() {
- @Override
- public V execute() {
- return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
- }
- }, deleteTime);
- } else {
- return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+ try {
+ if (deleteTime.shouldRecord()) {
+ return measureLatency(new Action<V>() {
+ @Override
+ public V execute() {
+ return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+ }
+ }, deleteTime);
+ } else {
+ return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+ }
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key);
+ throw new ProcessorStateException(message, e);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 5636219..3e881ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -128,6 +129,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
try {
final Bytes key = keyBytes(sessionKey.key());
inner.remove(new Windowed<>(key, sessionKey.window()));
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), sessionKey.key());
+ throw new ProcessorStateException(message, e);
} finally {
this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
}
@@ -140,6 +144,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
try {
final Bytes key = keyBytes(sessionKey.key());
this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
+ throw new ProcessorStateException(message, e);
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2487854..62ed6c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -94,6 +95,9 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
final long startNs = time.nanoseconds();
try {
inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key, value);
+ throw new ProcessorStateException(message, e);
} finally {
metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ff6c56a..cfef035 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -224,7 +224,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
try {
return this.db.get(rawKey);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e);
}
}
@@ -300,13 +301,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
try {
db.delete(wOptions, rawKey);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while removing key from store " + this.name, e);
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while removing key %s from store " + this.name, e);
}
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e);
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e);
}
}
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.