You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/20 23:25:20 UTC

[kafka] branch trunk updated: MINOR: Fix type inference on joins and aggregates (#5019)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96cda0e  MINOR: Fix type inference on joins and aggregates (#5019)
96cda0e is described below

commit 96cda0e07ac4981a642c6b32fa543bcce78be769
Author: Joan Goyeau <jo...@goyeau.com>
AuthorDate: Mon May 21 00:25:16 2018 +0100

    MINOR: Fix type inference on joins and aggregates (#5019)
    
    The type inference doesn't currently work for the join functions in Scala as it doesn't know yet the types of the given KStream[K, V] or KTable[K, V].
    
    The fix here is to curry the joiner function. I personally prefer this notation but this also means it differs more from the Java API.
    I believe the diff with the Java API is worth in this case as it's not only solving the type inference but also fits better the Scala way of coding (ex: fold).
    
    Moreover any Scala dev will bug and spend little time on these functions trying to understand why the type inference is not working and then get frustrated to be obliged to be explicit here where it's not harmful to be inferred.
    
    Reviewers: Debasish Ghosh <dg...@acm.org>, Guozhang Wang <gu...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../streams/scala/kstream/KGroupedStream.scala     | 15 ++--
 .../streams/scala/kstream/KGroupedTable.scala      | 15 ++--
 .../kafka/streams/scala/kstream/KStream.scala      | 94 ++++++++++------------
 .../kafka/streams/scala/kstream/KTable.scala       | 36 ++++-----
 .../scala/kstream/SessionWindowedKStream.scala     | 14 ++--
 .../scala/kstream/TimeWindowedKStream.scala        | 12 +--
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  4 +-
 .../apache/kafka/streams/scala/TopologyTest.scala  |  2 +-
 8 files changed, 87 insertions(+), 105 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index acffb1f..2e85bce 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -84,8 +84,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
    */ 
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+  def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
 
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
     // works perfectly with Scala 2.12 though
@@ -101,9 +100,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -115,10 +113,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
    * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 673ab5d..87a11c5 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -108,10 +108,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator)
 
   /**
    * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -125,9 +123,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR,
-                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR,
+                                        subtractor: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 4b0dc2b..49d9fe4 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -47,9 +47,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filter`
    */ 
-  def filter(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filter(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filter(predicate.asPredicate)
-  }
 
   /**
    * Create a new [[KStream]] that consists all records of this stream which do <em>not</em> satisfy the given
@@ -59,9 +58,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filterNot`
    */ 
-  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filterNot(predicate.asPredicate)
-  }
 
   /**
    * Set a new key (with possibly new type) for each input record.
@@ -73,9 +71,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with new key (possibly of different type) and unmodified value
    * @see `org.apache.kafka.streams.kstream.KStream#selectKey`
    */ 
-  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
     inner.selectKey[KR](mapper.asKeyValueMapper)
-  }
 
   /**
    * Transform each record of the input stream into a new record in the output stream (both key and value type can be
@@ -101,9 +98,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: V => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: V => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of the output record.
@@ -114,9 +110,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Transform each record of the input stream into zero or more records in the output stream (both key and value type
@@ -145,9 +140,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Create a new [[KStream]] by transforming the value of each record in this stream into zero or more values
@@ -161,9 +155,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Print the records of this KStream using the options provided by `Printed`
@@ -179,9 +172,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
    */
-  def foreach(action: (K, V) => Unit): Unit = {
+  def foreach(action: (K, V) => Unit): Unit =
     inner.foreach((k: K, v: V) => action(k, v))
-  }
 
   /**
    * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
@@ -191,9 +183,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return multiple distinct substreams of this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#branch`
    */
-  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = {
+  def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
-  }
 
   /**
    * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for 
@@ -304,9 +295,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */ 
   def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-                          stateStoreNames: String*): KStream[K, VR] = {
+                          stateStoreNames: String*): KStream[K, VR] =
     inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of the output record.
@@ -335,9 +325,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param stateStoreNames   the names of the state store used by the processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */ 
-  def process(processorSupplier: () => Processor[K, V],
-    stateStoreNames: String*): Unit = {
-
+  def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = {
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
       override def get(): Processor[K, V] = processorSupplier()
     }
@@ -425,11 +413,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    * one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VO, VR](otherStream: KStream[K, VO],
+   */
+  def join[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using inner equi join with 
@@ -444,10 +433,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    * one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
+   */
+  def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
+    inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using non-windowed inner equi join.
@@ -460,14 +448,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#join`
    */ 
-  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] =
-      inner.join[GK, GV, RV](
-        globalKTable,
-        ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
-        ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
-      )
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
+    inner.join[GK, GV, RV](
+      globalKTable,
+      ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
+      ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
+    )
 
   /**
    * Join records of this stream with another [[KStream]]'s records using windowed left equi join with 
@@ -484,10 +473,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                    one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VO, VR](otherStream: KStream[K, VO],
+  def leftJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using left equi join with 
@@ -503,9 +493,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                 one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
+  def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
+    inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using non-windowed left equi join.
@@ -518,12 +507,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] = {
-
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
     inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with 
@@ -540,10 +528,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
    */ 
-  def outerJoin[VO, VR](otherStream: KStream[K, VO],
+  def outerJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Merge this stream and the given stream into one larger stream.
@@ -567,7 +556,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
-  def peek(action: (K, V) => Unit): KStream[K, V] = {
+  def peek(action: (K, V) => Unit): KStream[K, V] =
     inner.peek((k: K, v: V) => action(k, v))
-  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 65cf895..cff1844 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -225,7 +225,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param serialized    the `Serialized` instance used to specify `Serdes`
    * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
-   */ 
+   */
   def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] =
     inner.groupBy(selector.asKeyValueMapper, serialized)
 
@@ -237,9 +237,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -252,10 +251,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
+   */
+  def join[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -266,9 +266,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -281,10 +280,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -295,9 +295,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def outerJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -311,9 +310,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
+  def outerJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 1e25554..fd2a565 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -45,10 +45,9 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger)
 
   /**
    * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
@@ -61,11 +60,12 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
     merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, ByteArraySessionStore]): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
+    materialized: Materialized[K, VR, ByteArraySessionStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key into `SessionWindows`.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index b00d025..a16c72b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -44,9 +44,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -58,10 +57,11 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayWindowStore]): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+    materialized: Materialized[K, VR, ByteArrayWindowStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key and the defined windows.
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 113458e..7aa0648 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -88,7 +88,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
       userClicksStream
 
         // Join the stream against the table.
-        .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
         .map((_, regionWithClicks) => regionWithClicks)
@@ -180,7 +180,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
 
     streamsConfiguration
   }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 9495ea7..e8b9f0f 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -142,7 +142,7 @@ class TopologyTest extends JUnitSuite {
   
       val clicksPerRegion: KTable[String, Long] =
         userClicksStream
-          .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
+          .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
           .map((_, regionWithClicks) => regionWithClicks)
           .groupByKey
           .reduce(_ + _)

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.