You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/09 03:59:56 UTC

[kafka] branch iqv2-key-query created (now bae3a36)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch iqv2-key-query
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at bae3a36  KAFKA-13525: Implement KeyQuery in Streams IQv2

This branch includes the following new commits:

     new bae3a36  KAFKA-13525: Implement KeyQuery in Streams IQv2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kafka] 01/01: KAFKA-13525: Implement KeyQuery in Streams IQv2

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-key-query
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bae3a36ab94d1c25ea0adccd21977b9653c30e63
Author: John Roesler <vv...@apache.org>
AuthorDate: Sun Dec 5 15:48:20 2021 -0600

    KAFKA-13525: Implement KeyQuery in Streams IQv2
    
    Implement the KeyQuery and RawKeyQuery as proposed in KIP-796
---
 .../apache/kafka/streams/query/FailureReason.java  |   9 +-
 .../org/apache/kafka/streams/query/KeyQuery.java   |  37 +++++++
 .../apache/kafka/streams/query/QueryResult.java    |  14 ++-
 .../apache/kafka/streams/query/RawKeyQuery.java    |  42 ++++++++
 .../state/internals/MeteredKeyValueStore.java      |  80 +++++++++++++++
 .../streams/state/internals/StoreQueryUtils.java   |  88 +++++++++++++---
 .../integration/IQv2StoreIntegrationTest.java      | 114 +++++++++++++++++++++
 7 files changed, 367 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
index 6ec319d..c250f1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
@@ -52,5 +52,12 @@ public enum FailureReason {
      * The requested store partition does not exist at all. For example, partition 4 was requested,
      * but the store in question only has 4 partitions (0 through 3).
      */
-    DOES_NOT_EXIST;
+    DOES_NOT_EXIST,
+
+    /**
+     * The store that handled the query got an exception during query execution. The message
+     * will contain the exception details. Depending on the nature of the exception, the caller
+     * may be able to retry this instance or may need to try a different instance.
+     */
+    STORE_EXCEPTION;
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
new file mode 100644
index 0000000..b183929
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/KeyQuery.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+
+@Evolving
+public class KeyQuery<K, V> implements Query<V> {
+
+    private final K key;
+
+    private KeyQuery(final K key) {
+        this.key = key;
+    }
+
+    public static <K, V> KeyQuery<K, V> withKey(final K key) {
+        return new KeyQuery<>(key);
+    }
+
+    public K getKey() {
+        return key;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
index 780ea86..38bbb2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
@@ -29,10 +29,10 @@ import java.util.List;
  */
 public final class QueryResult<R> {
 
-    private final List<String> executionInfo = new LinkedList<>();
     private final FailureReason failureReason;
     private final String failure;
     private final R result;
+    private List<String> executionInfo = new LinkedList<>();
     private Position position;
 
     private QueryResult(final R result) {
@@ -197,6 +197,18 @@ public final class QueryResult<R> {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public <V> QueryResult<V> swapResult(final V value) {
+        if (isFailure()) {
+            return (QueryResult<V>) this;
+        } else {
+            final QueryResult<V> result = new QueryResult<>(value);
+            result.executionInfo = executionInfo;
+            result.position = position;
+            return result;
+        }
+    }
+
     @Override
     public String toString() {
         return "QueryResult{" +
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
new file mode 100644
index 0000000..80bd4e3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/RawKeyQuery.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
+import org.apache.kafka.common.utils.Bytes;
+
+@Evolving
+public class RawKeyQuery implements Query<byte[]> {
+
+    private final Bytes key;
+
+    private RawKeyQuery(final Bytes key) {
+        this.key = key;
+    }
+
+    public static RawKeyQuery withKey(final Bytes key) {
+        return new RawKeyQuery(key);
+    }
+
+    public static RawKeyQuery withKey(final byte[] key) {
+        return new RawKeyQuery(Bytes.wrap(key));
+    }
+
+    public Bytes getKey() {
+        return key;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 937288c..f382a35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -34,15 +35,24 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
@@ -79,6 +89,14 @@ public class MeteredKeyValueStore<K, V>
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
+    private  Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo)
+            )
+        );
+
     MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
                          final String metricsScope,
                          final Time time,
@@ -186,6 +204,68 @@ public class MeteredKeyValueStore<K, V>
         return false;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
+        final long start = System.nanoTime();
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, collectExecutionInfo);
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (System.nanoTime() - start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                this
+            );
+            if (collectExecutionInfo) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (System.nanoTime() - start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query query,
+        final PositionBound positionBound, final boolean collectExecutionInfo) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedQuery = (KeyQuery<K, V>) query;
+        final RawKeyQuery rawKeyQuery = RawKeyQuery.withKey(keyBytes(typedQuery.getKey()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+        if (rawResult.isSuccess()) {
+            final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
+            final Serde<V> vSerde = serdes.valueSerde();
+            final Deserializer<V> deserializer;
+            if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
+                final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
+                    (ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
+                deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
+            } else {
+                deserializer = vSerde.deserializer();
+            }
+            final V value = deserializer.deserialize(serdes.topic(), rawResult.getResult());
+            final QueryResult<V> typedQueryResult =
+                rawResult.swapResult(value);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 006981e..341f69f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -19,16 +19,73 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RawKeyQuery;
+import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Map;
-import java.util.Map.Entry;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 
 public final class StoreQueryUtils {
 
+    /**
+     * a utility interface to facilitate stores' query dispatch logic,
+     * allowing them to generically store query execution logic as the values
+     * in a map.
+     */
+    @FunctionalInterface
+    public interface QueryHandler {
+        QueryResult<?> apply(
+            final Query<?> query,
+            final PositionBound positionBound,
+            final boolean collectExecutionInfo,
+            final StateStore store
+        );
+    }
+
+
+    private static Map<Class, QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                PingQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
+            ),
+            mkEntry(RawKeyQuery.class,
+                (query, positionBound, collectExecutionInfo, store) -> {
+                    if (store instanceof KeyValueStore) {
+                        final RawKeyQuery rawKeyQuery = (RawKeyQuery) query;
+                        final KeyValueStore keyValueStore = (KeyValueStore) store;
+                        try {
+                            @SuppressWarnings("unchecked") final byte[] bytes =
+                                (byte[]) keyValueStore.get(rawKeyQuery.getKey());
+                            return QueryResult.forResult(bytes);
+                        } catch (final Throwable t) {
+                            final StringWriter stringWriter = new StringWriter();
+                            final PrintWriter printWriter = new PrintWriter(stringWriter);
+                            printWriter.println(
+                                store.getClass() + " failed to handle query " + query + ":");
+                            t.printStackTrace(printWriter);
+                            printWriter.flush();
+                            final String message = stringWriter.toString();
+                            return QueryResult.forFailure(
+                                FailureReason.STORE_EXCEPTION,
+                                message
+                            );
+                        }
+                    } else {
+                        return QueryResult.forUnknownQueryType(query, store);
+                    }
+                })
+        );
+
     // make this class uninstantiable
     private StoreQueryUtils() {
     }
@@ -43,16 +100,21 @@ public final class StoreQueryUtils {
         final int partition
     ) {
 
-        final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        if (query instanceof PingQuery) {
-            if (!isPermitted(position, positionBound, partition)) {
-                result = QueryResult.notUpToBound(position, positionBound, partition);
-            } else {
-                result = (QueryResult<R>) QueryResult.forResult(true);
-            }
-        } else {
+        final QueryResult<R> result;
+
+        final QueryHandler handler = queryHandlers.get(query.getClass());
+        if (handler == null) {
             result = QueryResult.forUnknownQueryType(query, store);
+        } else if (!isPermitted(position, positionBound, partition)) {
+            result = QueryResult.notUpToBound(position, positionBound, partition);
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                collectExecutionInfo,
+                store
+            );
         }
         if (collectExecutionInfo) {
             result.addExecutionInfo(
@@ -88,14 +150,10 @@ public final class StoreQueryUtils {
                 if (!partitionBounds.containsKey(partition)) {
                     // this topic isn't bounded for our partition, so just skip over it.
                 } else {
-                    if (seenPartitionBounds == null) {
-                        // we haven't seen a topic that is bounded for our partition
-                        return false;
-                    } else if (!seenPartitionBounds.containsKey(partition)) {
+                    if (!seenPartitionBounds.containsKey(partition)) {
                         // we haven't seen a partition that we have a bound for
                         return false;
-                    } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
-                        partition)) {
+                    } else if (seenPartitionBounds.get(partition) < partitionBounds.get(partition)) {
                         // our current position is behind the bound
                         return false;
                     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 5a2e67b..97e030f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
@@ -48,6 +49,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.PingQuery;
@@ -77,6 +79,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
@@ -122,6 +125,11 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_IN_MEMORY_LRU {
             @Override
@@ -133,6 +141,11 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_ROCKS_KV {
             @Override
@@ -141,9 +154,19 @@ public class IQv2StoreIntegrationTest {
             }
 
             @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         GLOBAL_TIME_ROCKS_KV {
             @Override
@@ -155,30 +178,60 @@ public class IQv2StoreIntegrationTest {
             public boolean global() {
                 return true;
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.inMemoryKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_LRU {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.lruMap(STORE_NAME, 100);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         ROCKS_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.persistentKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean timestamped() {
+                return false;
+            }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         TIME_ROCKS_KV {
             @Override
             public StoreSupplier<?> supplier() {
                 return Stores.persistentTimestampedKeyValueStore(STORE_NAME);
             }
+
+            @Override
+            public boolean keyValue() {
+                return true;
+            }
         },
         IN_MEMORY_WINDOW {
             @Override
@@ -216,9 +269,17 @@ public class IQv2StoreIntegrationTest {
 
         public abstract StoreSupplier<?> supplier();
 
+        public boolean timestamped() {
+            return true; // most stores are timestamped
+        };
+
         public boolean global() {
             return false;
         }
+
+        public boolean keyValue() {
+            return false;
+        }
     }
 
     @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
@@ -426,6 +487,22 @@ public class IQv2StoreIntegrationTest {
             shouldHandlePingQuery();
             shouldCollectExecutionInfo();
             shouldCollectExecutionInfoUnderFailure();
+
+            if (storeToTest.keyValue()) {
+                if (storeToTest.timestamped()) {
+                    shouldHandleKeyQuery(
+                        2,
+                        (Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
+                        2
+                    );
+                } else {
+                    shouldHandleKeyQuery(
+                        2,
+                        Function.identity(),
+                        2
+                    );
+                }
+            }
         }
     }
 
@@ -513,6 +590,43 @@ public class IQv2StoreIntegrationTest {
         assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
+    public <V> void shouldHandleKeyQuery(
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
+
+        final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
+        final StateQueryRequest<V> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<V> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        final QueryResult<V> queryResult =
+            result.getGlobalResult() != null
+                ? result.getGlobalResult()
+                : result.getOnlyPartitionResult();
+        final boolean failure = queryResult.isFailure();
+        if (failure) {
+            throw new AssertionError(queryResult.toString());
+        }
+        assertThat(queryResult.isSuccess(), is(true));
+
+        assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
+        assertThrows(IllegalArgumentException.class,
+            queryResult::getFailureMessage);
+
+        final V result1 = queryResult.getResult();
+        final Integer integer = valueExtactor.apply(result1);
+        assertThat(integer, is(expectedValue));
+
+        assertThat(queryResult.getExecutionInfo(), is(empty()));
+
+    }
+
     public void shouldCollectExecutionInfo() {
 
         final PingQuery query = new PingQuery();