You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/20 18:45:15 UTC

[kafka] branch iqv2-move-swapresult created (now 96453f3)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch iqv2-move-swapresult
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 96453f3  KAFKA-13557: Remove swapResult from the public API

This branch includes the following new commits:

     new 96453f3  KAFKA-13557: Remove swapResult from the public API

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kafka] 01/01: KAFKA-13557: Remove swapResult from the public API

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-move-swapresult
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 96453f359d3d3b2416805e75cd0638051e4fc50f
Author: John Roesler <vv...@apache.org>
AuthorDate: Mon Dec 20 12:43:46 2021 -0600

    KAFKA-13557: Remove swapResult from the public API
---
 .../streams/query/InternalQueryResultUtil.java     | 49 ++++++++++++++++++++++
 .../apache/kafka/streams/query/QueryResult.java    | 24 +++++------
 .../state/internals/MeteredKeyValueStore.java      | 11 +++--
 3 files changed, 67 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
new file mode 100644
index 0000000..fc81126
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(
+        final QueryResult<?> rawResult,
+        final R deserializedResult) {
+
+        if (rawResult.isFailure()) {
+            throw new IllegalArgumentException(
+                "Callers must avoid calling this method on a failed result."
+            );
+        } else {
+            return new QueryResult<>(
+                deserializedResult,
+                rawResult.getExecutionInfo(),
+                rawResult.getPosition()
+            );
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 1e04e5b..2ffe5b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -41,6 +41,17 @@ public final class QueryResult<R> {
         this.failure = null;
     }
 
+    /**
+     * package-private constructor used in {@link InternalQueryResultUtil}.
+     */
+    QueryResult(final R result, final List<String> executionInfo, final Position position) {
+        this.result = result;
+        this.failureReason = null;
+        this.failure = null;
+        this.executionInfo = executionInfo;
+        this.position = position;
+    }
+
     private QueryResult(final FailureReason failureReason, final String failure) {
         this.result = null;
         this.failureReason = failureReason;
@@ -197,19 +208,6 @@ public final class QueryResult<R> {
         return result;
     }
 
-    public <V> QueryResult<V> swapResult(final V value) {
-        if (isFailure()) {
-            throw new IllegalArgumentException(
-                "Callers must avoid calling this method on a failed result."
-            );
-        } else {
-            final QueryResult<V> result = new QueryResult<>(value);
-            result.executionInfo = executionInfo;
-            result.position = position;
-            return result;
-        }
-    }
-
     @Override
     public String toString() {
         return "QueryResult{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index b8bacfa..32f4550 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.InternalQueryResultUtil;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -270,9 +271,11 @@ public class MeteredKeyValueStore<K, V>
                 getSensor,
                 getValueDeserializer()
             );
-            final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult(
-                resultIterator
-            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                    rawResult,
+                    resultIterator
+                );
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no result set.
@@ -296,7 +299,7 @@ public class MeteredKeyValueStore<K, V>
             final Deserializer<V> deserializer = getValueDeserializer();
             final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
             final QueryResult<V> typedQueryResult =
-                rawResult.swapResult(value);
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
             result = (QueryResult<R>) typedQueryResult;
         } else {
             // the generic type doesn't matter, since failed queries have no result set.