You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/11/03 18:11:23 UTC

[kafka] branch 3.1 updated: KAFKA-13261: Add support for custom partitioners in foreign key joins (#11368)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new fdcbcef  KAFKA-13261: Add support for custom partitioners in foreign key joins (#11368)
fdcbcef is described below

commit fdcbcefd6984f0f53db1c1b0b81cf93b7ff06dd5
Author: Victoria Xia <vi...@gmail.com>
AuthorDate: Wed Nov 3 10:55:24 2021 -0700

    KAFKA-13261: Add support for custom partitioners in foreign key joins (#11368)
    
    Implements KIP-775.
    
    Co-authored-by: Tomas Forsman <to...@users.noreply.github.com>
---
 .../org/apache/kafka/streams/kstream/KTable.java   | 111 +++++++++
 .../apache/kafka/streams/kstream/TableJoined.java  | 135 +++++++++++
 .../streams/kstream/internals/KTableImpl.java      | 119 ++++++++--
 .../kstream/internals/TableJoinedInternal.java     |  39 ++++
 .../ForeignJoinSubscriptionProcessorSupplier.java  |   4 -
 ...yInnerJoinCustomPartitionerIntegrationTest.java | 255 +++++++++++++++++++++
 .../kafka/streams/scala/kstream/KTable.scala       |  48 +++-
 .../kafka/streams/TopologyTestDriverTest.java      |   4 +-
 8 files changed, 695 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index ad86614..d1c8623 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -2117,13 +2118,39 @@ public interface KTable<K, V> {
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     *
+     * @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined)} instead.
      */
+    @Deprecated
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                     final Function<V, KO> foreignKeyExtractor,
                                     final ValueJoiner<V, VO, VR> joiner,
                                     final Named named);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
+     * using the {@link TableJoined} instance for optional configurations including
+     * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final Function<V, KO> foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final TableJoined<K, KO> tableJoined);
+
+    /**
      * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
      * <p>
      * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
@@ -2160,7 +2187,10 @@ public interface KTable<K, V> {
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     *
+     * @deprecated since 3.1, removal planned for 4.0. Use {@link #join(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
      */
+    @Deprecated
     <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                     final Function<V, KO> foreignKeyExtractor,
                                     final ValueJoiner<V, VO, VR> joiner,
@@ -2168,6 +2198,32 @@ public interface KTable<K, V> {
                                     final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
+     * using the {@link TableJoined} instance for optional configurations including
+     * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final Function<V, KO> foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final TableJoined<K, KO> tableJoined,
+                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
      * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
      * <p>
      * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
@@ -2199,13 +2255,39 @@ public interface KTable<K, V> {
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     *
+     * @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined)} instead.
      */
+    @Deprecated
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                         final Function<V, KO> foreignKeyExtractor,
                                         final ValueJoiner<V, VO, VR> joiner,
                                         final Named named);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
+     * using the {@link TableJoined} instance for optional configurations including
+     * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final Function<V, KO> foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final TableJoined<K, KO> tableJoined);
+
+    /**
      * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
      * <p>
      * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
@@ -2242,7 +2324,10 @@ public interface KTable<K, V> {
      * @param <KO>                the key type of the other {@code KTable}
      * @param <VO>                the value type of the other {@code KTable}
      * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     *
+     * @deprecated since 3.1, removal planned for 4.0. Use {@link #leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized)} instead.
      */
+    @Deprecated
     <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                         final Function<V, KO> foreignKeyExtractor,
                                         final ValueJoiner<V, VO, VR> joiner,
@@ -2250,6 +2335,32 @@ public interface KTable<K, V> {
                                         final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 
     /**
+     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
+     * using the {@link TableJoined} instance for optional configurations including
+     * {@link StreamPartitioner partitioners} when the tables being joined use non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
+     *                            result is null, the update is ignored as invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
+     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final Function<V, KO> foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final TableJoined<K, KO> tableJoined,
+                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
      * Get the name of the local state store used that can be used to query this {@code KTable}.
      *
      * @return the underlying state store name, or {@code null} if this {@code KTable} cannot be queried.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java
new file mode 100644
index 0000000..70a3630
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TableJoined.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+import java.util.function.Function;
+
+/**
+ * The {@code TableJoined} class represents optional parameters that can be passed to
+ * {@link KTable#join(KTable, Function, ValueJoiner, TableJoined) KTable#join(KTable,Function,...)} and
+ * {@link KTable#leftJoin(KTable, Function, ValueJoiner, TableJoined) KTable#leftJoin(KTable,Function,...)}
+ * operations, for foreign key joins.
+ * @param <K>   this key type ; key type for the left (primary) table
+ * @param <KO>  other key type ; key type for the right (foreign key) table
+ */
+public class TableJoined<K, KO> implements NamedOperation<TableJoined<K, KO>> {
+
+    protected final StreamPartitioner<K, Void> partitioner;
+    protected final StreamPartitioner<KO, Void> otherPartitioner;
+    protected final String name;
+
+    private TableJoined(final StreamPartitioner<K, Void> partitioner,
+                        final StreamPartitioner<KO, Void> otherPartitioner,
+                        final String name) {
+        this.partitioner = partitioner;
+        this.otherPartitioner = otherPartitioner;
+        this.name = name;
+    }
+
+    protected TableJoined(final TableJoined<K, KO> tableJoined) {
+        this(tableJoined.partitioner, tableJoined.otherPartitioner, tableJoined.name);
+    }
+
+    /**
+     * Create an instance of {@code TableJoined} with partitioner and otherPartitioner {@link StreamPartitioner} instances.
+     * {@code null} values are accepted and will result in the default partitioner being used.
+     *
+     * @param partitioner      a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary)
+     *                         table of the foreign key join. Specifying this option does not repartition or otherwise
+     *                         affect the source table; rather, this option informs the foreign key join on how internal
+     *                         topics should be partitioned in order to be co-partitioned with the left join table.
+     *                         The partitioning strategy must depend only on the message key and not the message value,
+     *                         else the source table is not supported with foreign key joins. This option may be left
+     *                         {@code null} if the source table uses the default partitioner.
+     * @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign
+     *                         key) table of the foreign key join. Specifying this option does not repartition or otherwise
+     *                         affect the source table; rather, this option informs the foreign key join on how internal
+     *                         topics should be partitioned in order to be co-partitioned with the right join table.
+     *                         The partitioning strategy must depend only on the message key and not the message value,
+     *                         else the source table is not supported with foreign key joins. This option may be left
+     *                         {@code null} if the source table uses the default partitioner.
+     * @param <K>              this key type ; key type for the left (primary) table
+     * @param <KO>             other key type ; key type for the right (foreign key) table
+     * @return new {@code TableJoined} instance with the provided partitioners
+     */
+    public static <K, KO> TableJoined<K, KO> with(final StreamPartitioner<K, Void> partitioner,
+                                                  final StreamPartitioner<KO, Void> otherPartitioner) {
+        return new TableJoined<>(partitioner, otherPartitioner, null);
+    }
+
+    /**
+     * Create an instance of {@code TableJoined} with base name for all components of the join, including internal topics
+     * created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including internal topics
+     * @param <K>  this key type ; key type for the left (primary) table
+     * @param <KO> other key type ; key type for the right (foreign key) table
+     * @return new {@code TableJoined} instance configured with the {@code name}
+     *
+     */
+    public static <K, KO> TableJoined<K, KO> as(final String name) {
+        return new TableJoined<>(null, null, name);
+    }
+
+    /**
+     * Set the custom {@link StreamPartitioner} to be used as part of computing the join.
+     * {@code null} values are accepted and will result in the default partitioner being used.
+     *
+     * @param partitioner a {@link StreamPartitioner} that captures the partitioning strategy for the left (primary)
+     *                    table of the foreign key join. Specifying this option does not repartition or otherwise
+     *                    affect the source table; rather, this option informs the foreign key join on how internal
+     *                    topics should be partitioned in order to be co-partitioned with the left join table.
+     *                    The partitioning strategy must depend only on the message key and not the message value,
+     *                    else the source table is not supported with foreign key joins. This option may be left
+     *                    {@code null} if the source table uses the default partitioner.
+     * @return new {@code TableJoined} instance configured with the {@code partitioner}
+     */
+    public TableJoined<K, KO> withPartitioner(final StreamPartitioner<K, Void> partitioner) {
+        return new TableJoined<>(partitioner, otherPartitioner, name);
+    }
+
+    /**
+     * Set the custom other {@link StreamPartitioner} to be used as part of computing the join.
+     * {@code null} values are accepted and will result in the default partitioner being used.
+     *
+     * @param otherPartitioner a {@link StreamPartitioner} that captures the partitioning strategy for the right (foreign
+     *                         key) table of the foreign key join. Specifying this option does not repartition or otherwise
+     *                         affect the source table; rather, this option informs the foreign key join on how internal
+     *                         topics should be partitioned in order to be co-partitioned with the right join table.
+     *                         The partitioning strategy must depend only on the message key and not the message value,
+     *                         else the source table is not supported with foreign key joins. This option may be left
+     *                         {@code null} if the source table uses the default partitioner.
+     * @return new {@code TableJoined} instance configured with the {@code otherPartitioner}
+     */
+    public TableJoined<K, KO> withOtherPartitioner(final StreamPartitioner<KO, Void> otherPartitioner) {
+        return new TableJoined<>(partitioner, otherPartitioner, name);
+    }
+
+    /**
+     * Set the base name used for all components of the join, including internal topics
+     * created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including internal topics
+     * @return new {@code TableJoined} instance configured with the {@code name}
+     */
+    @Override
+    public TableJoined<K, KO> withName(final String name) {
+        return new TableJoined<>(partitioner, otherPartitioner, name);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6f73044..8c2fb69a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.TableJoined;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -58,6 +59,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi
 import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
@@ -889,12 +891,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             other,
             foreignKeyExtractor,
             joiner,
-            NamedInternal.empty(),
+            TableJoined.with(null, null),
             Materialized.with(null, null),
             false
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                            final Function<V, KO> foreignKeyExtractor,
@@ -904,7 +907,22 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             other,
             foreignKeyExtractor,
             joiner,
-            named,
+            TableJoined.as(new NamedInternal(named).name()),
+            Materialized.with(null, null),
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final Function<V, KO> foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final TableJoined<K, KO> tableJoined) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            tableJoined,
             Materialized.with(null, null),
             false
         );
@@ -915,16 +933,40 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                            final Function<V, KO> foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, false);
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                            final Function<V, KO> foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Named named,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, false);
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            TableJoined.as(new NamedInternal(named).name()),
+            materialized,
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final Function<V, KO> foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final TableJoined<K, KO> tableJoined,
+                                           final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            tableJoined,
+            materialized,
+            false
+        );
     }
 
     @Override
@@ -935,12 +977,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             other,
             foreignKeyExtractor,
             joiner,
-            NamedInternal.empty(),
+            TableJoined.with(null, null),
             Materialized.with(null, null),
             true
         );
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                                final Function<V, KO> foreignKeyExtractor,
@@ -950,7 +993,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             other,
             foreignKeyExtractor,
             joiner,
-            named,
+            TableJoined.as(new NamedInternal(named).name()),
             Materialized.with(null, null),
             true
         );
@@ -960,9 +1003,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                                final Function<V, KO> foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> joiner,
+                                               final TableJoined<K, KO> tableJoined) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            tableJoined,
+            Materialized.with(null, null),
+            true
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final Function<V, KO> foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> joiner,
                                                final Named named,
                                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named, materialized, true);
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            TableJoined.as(new NamedInternal(named).name()),
+            materialized,
+            true);
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final Function<V, KO> foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> joiner,
+                                               final TableJoined<K, KO> tableJoined,
+                                               final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        return doJoinOnForeignKey(
+            other,
+            foreignKeyExtractor,
+            joiner,
+            tableJoined,
+            materialized,
+            true);
     }
 
     @Override
@@ -970,20 +1050,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                                final Function<V, KO> foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> joiner,
                                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, NamedInternal.empty(), materialized, true);
+        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true);
     }
 
     @SuppressWarnings("unchecked")
     private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> foreignKeyTable,
                                                           final Function<V, KO> foreignKeyExtractor,
                                                           final ValueJoiner<V, VO, VR> joiner,
-                                                          final Named joinName,
+                                                          final TableJoined<K, KO> tableJoined,
                                                           final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                                           final boolean leftJoin) {
         Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be null");
         Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
-        Objects.requireNonNull(joinName, "joinName can't be null");
+        Objects.requireNonNull(tableJoined, "tableJoined can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
         //Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
@@ -995,7 +1075,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues(true);
 
-        final NamedInternal renamed = new NamedInternal(joinName);
+        final TableJoinedInternal<K, KO> tableJoinedInternal = new TableJoinedInternal<>(tableJoined);
+        final NamedInternal renamed = new NamedInternal(tableJoinedInternal.name());
 
         final String subscriptionTopicName = renamed.suffixWithOrElseGet(
             "-subscription-registration",
@@ -1045,10 +1126,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         builder.addGraphNode(graphNode, subscriptionNode);
 
 
+        final StreamPartitioner<KO, SubscriptionWrapper<K>> subscriptionSinkPartitioner =
+            tableJoinedInternal.otherPartitioner() == null
+                ? null
+                : (topic, key, val, numPartitions) ->
+                    tableJoinedInternal.otherPartitioner().partition(topic, key, null, numPartitions);
+
         final StreamSinkNode<KO, SubscriptionWrapper<K>> subscriptionSink = new StreamSinkNode<>(
             renamed.suffixWithOrElseGet("-subscription-registration-sink", builder, SINK_NAME),
             new StaticTopicNameExtractor<>(subscriptionTopicName),
-            new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde))
+            new ProducedInternal<>(Produced.with(foreignKeySerde, subscriptionWrapperSerde, subscriptionSinkPartitioner))
         );
         builder.addGraphNode(subscriptionNode, subscriptionSink);
 
@@ -1115,11 +1202,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final String finalRepartitionTopicName = renamed.suffixWithOrElseGet("-subscription-response", builder, SUBSCRIPTION_RESPONSE) + TOPIC_SUFFIX;
         builder.internalTopologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty());
 
+        final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
+            tableJoinedInternal.partitioner() == null
+                ? null
+                : (topic, key, val, numPartitions) ->
+                    tableJoinedInternal.partitioner().partition(topic, key, null, numPartitions);
+
         final StreamSinkNode<K, SubscriptionResponseWrapper<VO>> foreignResponseSink =
             new StreamSinkNode<>(
                 renamed.suffixWithOrElseGet("-subscription-response-sink", builder, SINK_NAME),
                 new StaticTopicNameExtractor<>(finalRepartitionTopicName),
-                new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde))
+                new ProducedInternal<>(Produced.with(keySerde, responseWrapperSerde, foreignResponseSinkPartitioner))
             );
         builder.addGraphNode(subscriptionJoinForeignNode, foreignResponseSink);
         builder.addGraphNode(foreignJoinSubscriptionNode, foreignResponseSink);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java
new file mode 100644
index 0000000..fe16552
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TableJoinedInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.TableJoined;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class TableJoinedInternal<K, KO> extends TableJoined<K, KO> {
+
+    TableJoinedInternal(final TableJoined<K, KO> tableJoined) {
+        super(tableJoined);
+    }
+
+    public StreamPartitioner<K, Void> partitioner() {
+        return partitioner;
+    }
+
+    public StreamPartitioner<KO, Void> otherPartitioner() {
+        return otherPartitioner;
+    }
+
+    public String name() {
+        return name;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index 7b1ec66..f0114b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@@ -69,9 +68,6 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.
             store = internalProcessorContext.getStateStore(storeBuilder);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
         @Override
         public void process(final KO key, final Change<VO> value) {
             // if the key is null, we do not need proceed aggregating
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
new file mode 100644
index 0000000..c83bbae
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Repartitioned;
+import org.apache.kafka.streams.kstream.TableJoined;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import kafka.utils.MockTime;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
+    private final static int NUM_BROKERS = 1;
+
+    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final static MockTime MOCK_TIME = CLUSTER.time;
+    private final static String TABLE_1 = "table1";
+    private final static String TABLE_2 = "table2";
+    private final static String OUTPUT = "output-";
+    private final Properties streamsConfig = getStreamsConfig();
+    private final Properties streamsConfigTwo = getStreamsConfig();
+    private final Properties streamsConfigThree = getStreamsConfig();
+    private KafkaStreams streams;
+    private KafkaStreams streamsTwo;
+    private KafkaStreams streamsThree;
+    private final static Properties CONSUMER_CONFIG = new Properties();
+
+    private final static Properties PRODUCER_CONFIG_1 = new Properties();
+    private final static Properties PRODUCER_CONFIG_2 = new Properties();
+
+    @BeforeClass
+    public static void startCluster() throws IOException, InterruptedException {
+        CLUSTER.start();
+        //Use multiple partitions to ensure distribution of keys.
+
+        CLUSTER.createTopic(TABLE_1, 4, 1);
+        CLUSTER.createTopic(TABLE_2, 4, 1);
+        CLUSTER.createTopic(OUTPUT, 4, 1);
+
+        PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        final List<KeyValue<String, String>> table1 = asList(
+            new KeyValue<>("ID123-1", "ID123-A1"),
+            new KeyValue<>("ID123-2", "ID123-A2"),
+            new KeyValue<>("ID123-3", "ID123-A3"),
+            new KeyValue<>("ID123-4", "ID123-A4")
+        );
+
+        final List<KeyValue<String, String>> table2 = asList(
+            new KeyValue<>("ID123", "BBB")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, MOCK_TIME);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws IOException {
+        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
+        streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
+        streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
+    }
+
+    @After
+    public void after() throws IOException {
+        if (streams != null) {
+            streams.close();
+            streams = null;
+        }
+        if (streamsTwo != null) {
+            streamsTwo.close();
+            streamsTwo = null;
+        }
+        if (streamsThree != null) {
+            streamsThree.close();
+            streamsThree = null;
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));
+    }
+
+    @Test
+    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+        final Set<KeyValue<String, String>> expectedOne = new HashSet<>();
+        expectedOne.add(new KeyValue<>("ID123-1", "value1=ID123-A1,value2=BBB"));
+        expectedOne.add(new KeyValue<>("ID123-2", "value1=ID123-A2,value2=BBB"));
+        expectedOne.add(new KeyValue<>("ID123-3", "value1=ID123-A3,value2=BBB"));
+        expectedOne.add(new KeyValue<>("ID123-4", "value1=ID123-A4,value2=BBB"));
+
+        verifyKTableKTableJoin(expectedOne);
+    }
+
+    private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
+        final String innerJoinType = "INNER";
+        final String queryableName = innerJoinType + "-store1";
+
+        streams = prepareTopology(queryableName, streamsConfig);
+        streamsTwo = prepareTopology(queryableName, streamsConfigTwo);
+        streamsThree = prepareTopology(queryableName, streamsConfigThree);
+
+        final List<KafkaStreams> kafkaStreamsList = asList(streams, streamsTwo, streamsThree);
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+
+        final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
+            CONSUMER_CONFIG,
+            OUTPUT,
+            expectedResult.size()));
+
+        assertEquals(expectedResult, result);
+    }
+
+    private static Properties getStreamsConfig() {
+        final Properties streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
+        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+
+        return streamsConfig;
+    }
+
+    private static KafkaStreams prepareTopology(final String queryableName, final Properties streamsConfig) {
+
+        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> table1 = builder.stream(TABLE_1,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
+            .repartition(repartitionA())
+            .toTable(Named.as("table.a"));
+
+        final KTable<String, String> table2 = builder
+            .stream(TABLE_2,
+                Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)))
+            .repartition(repartitionB())
+            .toTable(Named.as("table.b"));
+
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
+        if (queryableName != null) {
+            materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
+                .withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true))
+                .withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+                .withCachingDisabled();
+        } else {
+            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
+        }
+
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
+
+        final TableJoined<String, String> tableJoined = TableJoined.with(
+            (topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions,
+            (topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions
+        );
+
+        table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized)
+            .toStream()
+            .to(OUTPUT,
+                Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                    serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
+
+        return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
+    }
+
+    private static Repartitioned<String, String> repartitionA() {
+        final Repartitioned<String, String> repartitioned = Repartitioned.as("a");
+        return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(getKeyB(key).hashCode()) % numPartitions)
+            .withNumberOfPartitions(4);
+    }
+
+    private static Repartitioned<String, String> repartitionB() {
+        final Repartitioned<String, String> repartitioned = Repartitioned.as("b");
+        return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+            .withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions)
+            .withNumberOfPartitions(4);
+    }
+
+    private static String getKeyB(final String value) {
+        return value.substring(0, value.indexOf("-"));
+    }
+
+}
\ No newline at end of file
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 892f39e..3a405b6 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.streams.kstream.{ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ}
+import org.apache.kafka.streams.kstream.{TableJoined, ValueJoiner, ValueTransformerWithKeySupplier, KTable => KTableJ}
 import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
   FunctionFromFunction,
   KeyValueMapperFromFunction,
@@ -656,6 +656,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
+  @deprecated("Use join(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1")
   def join[VR, KO, VO](
     other: KTable[KO, VO],
     keyExtractor: Function[V, KO],
@@ -666,6 +667,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
 
   /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner join. Records from this
+   * table are joined according to the result of keyExtractor on the other KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this table's value
+   * @param joiner       a function that computes the join result for a pair of matching records
+   * @param tableJoined  a [[TableJoined]] used to configure partitioners and names of internal topics and stores
+   * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def join[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    tableJoined: TableJoined[K, KO],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
+
+  /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
    * table are joined according to the result of keyExtractor on the other KTable.
    *
@@ -698,6 +721,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
+  @deprecated("Use leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized) instead", since = "3.1")
   def leftJoin[VR, KO, VO](
     other: KTable[KO, VO],
     keyExtractor: Function[V, KO],
@@ -708,6 +732,28 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
 
   /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left join. Records from this
+   * table are joined according to the result of keyExtractor on the other KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this table's value
+   * @param joiner       a function that computes the join result for a pair of matching records
+   * @param tableJoined  a [[TableJoined]] used to configure partitioners and names of internal topics and stores
+   * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def leftJoin[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    tableJoined: TableJoined[K, KO],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, tableJoined, materialized))
+
+  /**
    * Get the name of the local state store used that can be used to query this [[KTable]].
    *
    * @return the underlying state store name, or `null` if this [[KTable]] cannot be queried.
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 937951f..c541650 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.TableJoined;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
@@ -413,7 +413,7 @@ public abstract class TopologyTestDriverTest {
             .count(Materialized.as(firstTableName));
 
         builder.table(SOURCE_TOPIC_2, Materialized.as(secondTableName))
-            .join(t1, v -> v, (v1, v2) -> v2, Named.as(joinName));
+            .join(t1, v -> v, (v1, v2) -> v2, TableJoined.as(joinName));
 
         return builder.build(config);
     }