You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/07 03:42:54 UTC

[kafka] branch trunk updated: KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1509679  KAFKA-6538: Changes to enhance  ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)
1509679 is described below

commit 150967994a69b2e26f5f68efb2a54a8522ae641d
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Thu Jun 7 09:12:41 2018 +0530

    KAFKA-6538: Changes to enhance  ByteStore exceptions thrown from RocksDBStore with more human readable info (#5103)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../state/internals/InnerMeteredKeyValueStore.java | 72 +++++++++++++---------
 .../state/internals/MeteredSessionStore.java       |  7 +++
 .../state/internals/MeteredWindowStore.java        |  4 ++
 .../streams/state/internals/RocksDBStore.java      |  9 ++-
 4 files changed, 61 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 14464e0..200b2d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -173,30 +174,40 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     @Override
     public V get(final K key) {
-        if (getTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
-                }
-            }, getTime);
-        } else {
-            return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+        try {
+            if (getTime.shouldRecord()) {
+                return measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+                    }
+                }, getTime);
+            } else {
+                return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
         }
     }
 
     @Override
     public void put(final K key, final V value) {
-        if (putTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
-                    return null;
-                }
-            }, putTime);
-        } else {
-            inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+        try {
+            if (putTime.shouldRecord()) {
+                measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+                        return null;
+                    }
+                }, putTime);
+            } else {
+                inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
         }
     }
 
@@ -232,15 +243,20 @@ class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.Abstract
 
     @Override
     public V delete(final K key) {
-        if (deleteTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
-                }
-            }, deleteTime);
-        } else {
-            return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+        try {
+            if (deleteTime.shouldRecord()) {
+                return measureLatency(new Action<V>() {
+                    @Override
+                    public V execute() {
+                        return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+                    }
+                }, deleteTime);
+            } else {
+                return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+            }
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 5636219..3e881ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -128,6 +129,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         try {
             final Bytes key = keyBytes(sessionKey.key());
             inner.remove(new Windowed<>(key, sessionKey.window()));
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), sessionKey.key());
+            throw new ProcessorStateException(message, e);
         } finally {
             this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
         }
@@ -140,6 +144,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         try {
             final Bytes key = keyBytes(sessionKey.key());
             this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
+            throw new ProcessorStateException(message, e);
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2487854..62ed6c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -94,6 +95,9 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         final long startNs = time.nanoseconds();
         try {
             inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, value);
+            throw new ProcessorStateException(message, e);
         } finally {
             metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ff6c56a..cfef035 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -224,7 +224,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
         try {
             return this.db.get(rawKey);
         } catch (final RocksDBException e) {
-            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
+            // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+            throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e);
         }
     }
 
@@ -300,13 +301,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
             try {
                 db.delete(wOptions, rawKey);
             } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while removing key from store " + this.name, e);
+                // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+                throw new ProcessorStateException("Error while removing key %s from store " + this.name, e);
             }
         } else {
             try {
                 db.put(wOptions, rawKey, rawValue);
             } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e);
+                // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+                throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e);
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.