You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/20 18:45:00 UTC

[GitHub] [kafka] vvcephei opened a new pull request #11617: KAFKA-13557: Remove swapResult from the public API

vvcephei opened a new pull request #11617:
URL: https://github.com/apache/kafka/pull/11617


   Follow-on from https://github.com/apache/kafka/pull/11582 .
   Removes a public API method in favor of an internal utility method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772639230



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor

Review comment:
       Thanks!
   
   There was no particular reason not to make it an interface. I went ahead and did so in the latest commit, so you can see if you think it's more elegant that way.
   
   I placed the concrete implementations as well as the util class in an `internal` package, which also means that the special constructor can remain package-private and be accessible from the util.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(

Review comment:
       Sure thing!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772679426



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query.internals;
+
+
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class SucceededQueryResult<R> implements QueryResult<R> {
+
+    private final R result;
+    private List<String> executionInfo = new LinkedList<>();
+    private Position position;

Review comment:
       Sure; my experience says that keeping the class hierarchy as flat as possible is better for maintainability, but since you prefer an abstract class, I have added one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11617:
URL: https://github.com/apache/kafka/pull/11617


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772607893



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(

Review comment:
       Can we add some JavaDoc
   ```
   Creates a new `QueryResult` preserving the execution info and position of the provided result.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772642542



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(
+        final QueryResult<?> rawResult,
+        final R deserializedResult) {

Review comment:
       It seems like that would split the deserialization responsibility between the Metered stores (whose job it is to handle de/serialization) and the QueryResult domain, whose job it is to just carry results, so I reckon it's better to keep things as they are now.
   
   I agree with your motivation to clean up the Metered stores' logic, though. I have some ideas about that:
   * https://github.com/apache/kafka/pull/11614
   * https://github.com/vvcephei/kafka/pull/1
   
   Those PRs are sketches of possible approaches to https://issues.apache.org/jira/browse/KAFKA-13526




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#issuecomment-998384051


   Unrelated test failures:
   
   ```
   
   
   Build  / JDK 17 and Scala 2.13 /  org.apache.kafka.common.security.oauthbearer.secured.RefreshingHttpsJwksTest.testBasicScheduleRefresh() | 21 ms | 1
   -- | -- | --
   Build  / JDK 17 and Scala 2.13 /  org.apache.kafka.common.security.oauthbearer.secured.RefreshingHttpsJwksTest.testBasicScheduleRefresh() | 0.6 sec | 1
   Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, Security=PLAINTEXT | 1 min 10 sec | 1
   Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT | 1 min 15 sec | 1
   Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772606055



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor

Review comment:
       I would prefer to move this class into an internal package and make the constructor of QueryResult `public` -- it won't be useful for KS devs, but it also does not hurt if we allow them to create a `QueryResult`?
   
   Or should we extend `QueryResult` ? (Btw: why is `QueryResult` not an interface?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772654543



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(
+        final QueryResult<?> rawResult,
+        final R deserializedResult) {

Review comment:
       Ack. Makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772654218



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor

Review comment:
       > There was no particular reason not to make it an interface.
   
   I guess, we should switch this around and define everything as an interface by default, and only use classes if there is a good reason for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772608820



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/InternalQueryResultUtil.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+
+import java.util.List;
+
+/**
+ * This utility class is an internal API, but it must be located in the same package as
+ * {@link QueryResult} so that it can access the package-private constructor
+ * {@link QueryResult#QueryResult(Object, List, Position)}
+ */
+@Unstable
+public final class InternalQueryResultUtil {
+    // uninstantiable utility class
+    public InternalQueryResultUtil() {}
+
+    public static <R> QueryResult<R> copyAndSubstituteDeserializedResult(
+        final QueryResult<?> rawResult,
+        final R deserializedResult) {

Review comment:
       Not sure how many cases we have, but could we instead pass in the deserializer and all deserialization logic in this class? So far there are two case, plain value and Iterator case only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11617: KAFKA-13557: Remove swapResult from the public API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11617:
URL: https://github.com/apache/kafka/pull/11617#discussion_r772656401



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/internals/SucceededQueryResult.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query.internals;
+
+
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.StateQueryRequest;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Container for a single partition's result when executing a {@link StateQueryRequest}.
+ *
+ * @param <R> The result type of the query.
+ */
+public final class SucceededQueryResult<R> implements QueryResult<R> {
+
+    private final R result;
+    private List<String> executionInfo = new LinkedList<>();
+    private Position position;

Review comment:
       `executionInfo` and `position` seems to be duplicated here and in `FailureQueryResult`. Should we add an abstract parent class for both to avoid code duplication?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org