You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/23 20:36:04 UTC

[jira] [Commented] (KAFKA-6670) Implement a Scala wrapper library for Kafka Streams

    [ https://issues.apache.org/jira/browse/KAFKA-6670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448824#comment-16448824 ] 

ASF GitHub Bot commented on KAFKA-6670:
---------------------------------------

guozhangwang closed pull request #4756: KAFKA-6670: Implement a Scala wrapper library for Kafka Streams
URL: https://github.com/apache/kafka/pull/4756
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index ba594ff03d4..04f8feed0ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,4 +50,4 @@ docs/generated/
 
 kafkatest.egg-info/
 systest/
-
+*.swp
diff --git a/build.gradle b/build.gradle
index 69f560ec38f..cd68cab5d1e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -124,7 +124,8 @@ if (new File('.git').exists()) {
         '**/id_rsa',
         '**/id_rsa.pub',
         'checkstyle/suppressions.xml',
-        'streams/quickstart/java/src/test/resources/projects/basic/goal.txt'
+        'streams/quickstart/java/src/test/resources/projects/basic/goal.txt',
+        'streams/streams-scala/logs/*'
     ])
   }
 }
@@ -518,7 +519,7 @@ for ( sv in availableScalaVersions ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:test-utils', 'streams:examples'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs
 
 /** Create one task per default Scala version */
 def withDefScalaVersions(taskName) {
@@ -733,6 +734,8 @@ project(':core') {
     from(project(':connect:file').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
     from(project(':streams').configurations.runtime) { into("libs/") }
+    from(project(':streams:streams-scala').jar) { into("libs/") }
+    from(project(':streams:streams-scala').configurations.runtime) { into("libs/") }
     from(project(':streams:test-utils').jar) { into("libs/") }
     from(project(':streams:test-utils').configurations.runtime) { into("libs/") }
     from(project(':streams:examples').jar) { into("libs/") }
@@ -965,6 +968,46 @@ project(':streams') {
   }
 }
 
+project(':streams:streams-scala') {
+  println "Building project 'streams-scala' with Scala version ${versions.scala}"
+  apply plugin: 'scala'
+  archivesBaseName = "kafka-streams-scala"
+
+  dependencies {
+    compile project(':streams')
+
+    compile libs.scalaLibrary
+
+    testCompile project(':core')
+    testCompile project(':core').sourceSets.test.output
+    testCompile project(':streams').sourceSets.test.output
+    testCompile project(':clients').sourceSets.test.output
+    testCompile libs.scalaLogging
+
+    testCompile libs.junit
+    testCompile libs.scalatest
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/streams/scala/**"
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.runtime) {
+      exclude('kafka-streams*')
+    }
+    into "$buildDir/dependant-libs-${versions.scala}"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
+
+}
+
 project(':streams:test-utils') {
   archivesBaseName = "kafka-streams-test-utils"
 
diff --git a/docs/api.html b/docs/api.html
index 97771861883..015d5140bd0 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -78,6 +78,19 @@ <h3><a id="streamsapi" href="#streamsapi">2.3 Streams API</a></h3>
 		&lt;/dependency&gt;
 	</pre>
 
+	<p>
+	When using Scala you may optionally include the <code>kafka-streams-scala</code> library.  Additional documentation on using the Kafka Streams DSL for Scala is available <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">in the developer guide</a>.
+	<p>
+	To use Kafka Streams DSL for Scala for Scala 2.11 you can use the following maven dependency:
+
+	<pre class="brush: xml;">
+		&lt;dependency&gt;
+			&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+			&lt;artifactId&gt;kafka-streams-scala_2.11&lt;/artifactId&gt;
+			&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
+		&lt;/dependency&gt;
+	</pre>
+
 	<h3><a id="connectapi" href="#connectapi">2.4 Connect API</a></h3>
 
 	The Connect API allows implementing connectors that continually pull from some source data system into Kafka or push from Kafka into some sink data system.
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 51bd5857eec..d8d7b4c985a 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -44,12 +44,15 @@
       <ul class="simple">
           <li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li>
           <li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li>
-          <li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a><ul>
+          <li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a></li>
+          <ul>
               <li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
               <li><a class="reference internal" href="#avro" id="id5">Avro</a></li>
               <li><a class="reference internal" href="#json" id="id6">JSON</a></li>
               <li><a class="reference internal" href="#further-serdes" id="id7">Further serdes</a></li>
           </ul>
+          <li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit SerDes</a></li>
+      </ul>
     <div class="section" id="configuring-serdes">
       <h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
       <p>SerDes specified in the Streams configuration via <code class="docutils literal"><span class="pre">StreamsConfig</span></code> are used as the default in your Kafka Streams application.</p>
@@ -155,23 +158,27 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></
           <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java">PageViewTypedDemo</a>
           example demonstrates how to use this JSON serde.</p>
       </div>
-    <div class="section" id="implementing-custom-serdes">
-      <span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
-      <p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
-        existing SerDes (see previous section).  Typically, your workflow will be similar to:</p>
-      <ol class="arabic simple">
-        <li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
-          <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
-        <li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
-          <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
-        <li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
-          <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
-          which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
-          <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
-          such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;,</span> <span class="pre">Deserializer&lt;T&gt;)</span></code>.</li>
-      </ol>
-</div>
-</div>
+      <div class="section" id="implementing-custom-serdes">
+        <span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
+        <p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
+          existing SerDes (see previous section).  Typically, your workflow will be similar to:</p>
+        <ol class="arabic simple">
+          <li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
+            <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
+          <li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
+            <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
+          <li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
+            <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
+            which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
+            <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
+            such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;,</span> <span class="pre">Deserializer&lt;T&gt;)</span></code>.</li>
+        </ol>
+      </div>
+    </div>
+    <div class="section" id="scala-dsl-serdes">
+      <h2>Kafka Streams DSL for Scala Implicit SerDes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2>
+      <p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default SerDes.  In fact, it's not supported.  SerDes are instead provided implicitly by default implementations for common primitive datatypes.  See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit SerDes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined SerDes</a> sections in the DSL API documentation for details</p>
+    </div>
 
 
                </div>
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 8552bcc8674..ad51884dc4c 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -67,6 +67,12 @@
                 </li>
                 <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
                 <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
+                <li><a class="reference internal" href="#scala-dsl" id="id27">Kafka Streams DSL for Scala</a></li>
+                <ul>
+                    <li><a class="reference internal" href="#scala-dsl-sample-usage" id="id28">Sample Usage</a></li>
+                    <li><a class="reference internal" href="#scala-dsl-implicit-serdes" id="id29">Implicit SerDes</a></li>
+                    <li><a class="reference internal" href="#scala-dsl-user-defined-serdes" id="id30">User-Defined SerDes</a></li>
+                </ul>
             </ul>
         </div>
         <div class="section" id="overview">
@@ -3153,11 +3159,183 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="#
                     <a class="reference internal" href="../../connect/index.html#kafka-connect"><span class="std std-ref">Kafka Connect API</span></a> instead.  However, if you do use such a sink processor, please be aware that
                     it is now your responsibility to guarantee message delivery semantics when talking to such external systems (e.g., to
                     retry on delivery failure or to prevent message duplication).</p>
-</div>
-</div>
+            </div>
+        </div>
         <div class="section" id="testing-a-streams-app">
             <a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
             Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
+            </div>
+        </div>
+        <div class="section" id="scala-dsl">
+            <span id="streams-developer-guide-dsl-scala-dsl"></span><h2><a class="toc-backref" href="#id27">Kafka Streams DSL for Scala</a><a class="headerlink" href="#scala-dsl" title="Permalink to this headline"></a></h2>
+            <p id="scala-dsl-motivation">The Kafka Streams DSL Java APIs are based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs.  These APIs can be called from Scala, but there are several issues:</p>
+            <ol class="last arabic simple">
+              <li><strong>Additional type annotations</strong> - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.</li>
+              <li><strong>Verbosity</strong> - In some cases the Java APIs appear too verbose compared to idiomatic Scala.</li>
+              <li><strong>Type Unsafety</strong> - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the SerDes defined as part of config are not type checked during compile time. Hence any missing SerDes can result in runtime errors.</li>
+            </ol>
+            <p id="scala-dsl-overview">The Kafka Streams DSL for Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL that addresses the concerns raised above.
+              It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates.
+            </p>
+            <p>The library wraps Java Stream DSL APIs in Scala thereby providing:</p>
+            <ol class="last arabic simple">
+              <li>Better type inference in Scala.</li>
+              <li>Less boilerplate in application code.</li>
+              <li>The usual builder-style composition that developers get with the original Java API.</li>
+              <li>Implicit serializers and de-serializers leading to better abstraction and less verbosity.</li>
+              <li>Better type safety during compile time.</li>
+            </ol>
+            <p>All functionality provided by Kafka Streams DSL for Scala are under the root package name of <code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala</span></code>.</p>
+            <p>Many of the public facing types from the Java API are wrapped. The following Scala abstractions are available to the user:</p>
+            <ul>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.StreamsBuilder</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.KStream</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.KTable</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.KGroupedStream</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.KGroupedTable</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.SessionWindowedKStream</span></code></li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.TimeWindowedKStream</span></code></li>
+            </ul>
+            <p>The library also has several utility abstractions and modules that the user needs to use for proper semantics.</p>
+            <ul>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ImplicitConversions</span></code>: Module that brings into scope the implicit conversions between the Scala and Java classes.</li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.DefaultSerdes</span></code>: Module that brings into scope the implicit values of all primitive SerDes.</li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ScalaSerde</span></code>: Base abstraction that can be used to implement custom SerDes in a type safe way.</li>
+            </ul>
+            <p>The library is cross-built with Scala 2.11 and 2.12.  To reference the library compiled against Scala 2.11 include the following in your maven <code>pom.xml</code> add the following:</p>
+            <pre class="brush: xml;">
+              &lt;dependency&gt;
+                &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+                &lt;artifactId&gt;kafka-streams-scala_2.11&lt;/artifactId&gt;
+                &lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
+              &lt;/dependency&gt;
+            </pre>
+            <p>To use the library compiled against Scala 2.12 replace the <code class="docutils literal"><span class="pre">artifactId</span></code> with <code class="docutils literal"><span class="pre">kafka-streams-scala_2.12</span></code>.</p>
+            <p>When using SBT then you can reference the correct library using the following:</p>
+            <pre class="brush: scala;">
+              libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "{{fullDotVersion}}"
+            </pre>
+            <div class="section" id="scala-dsl-sample-usage">
+              <span id="streams-developer-guide-dsl-sample-usage"></span><h3><a class="toc-backref" href="#id28">Sample Usage</a><a class="headerlink" href="#scala-dsl-sample-usage" title="Permalink to this headline"></a></h3>
+              <p>The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction, but they reside in a different package of the library e.g. the Scala class <code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.StreamsBuilder</span></code> is a wrapper around <code class="docutils literal"><span class="pre">org.apache.kafka.streams.StreamsBuilder</span></code>, <code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.kstream.KStream</span></code> is a wrapper around <code class="docutils literal"><span class="pre">org.apache.kafka.streams.kstream.KStream</span></code>, and so on.</p>
+              <p>Here's an example of the classic WordCount program that uses the Scala <code class="docutils literal"><span class="pre">StreamsBuilder</span></code> that builds an instance of <code class="docutils literal"><span class="pre">KStream</span></code> which is a wrapper around Java <code class="docutils literal"><span class="pre">KStream</span></code>. Then we reify to a table and get a <code class="docutils literal"><span class="pre">KTable</span></code>, which, again is a wrapper around Java <code class="docutils literal"><span class="pre">KTable</span></code>.</p>
+              <p>The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example.  Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.</p>
+              <pre class="brush: scala;">
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
+
+object WordCountApplication extends App {
+  import DefaultSerdes._
+  import ImplicitConversions._
+
+  val config: Properties = {
+    val p = new Properties()
+    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
+    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
+    p
+  }
+
+  val builder = new StreamsBuilder()
+  val textLines = builder.stream[String, String]("TextLinesTopic")
+  val wordCounts = textLines
+    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
+    .groupBy((_, word) => word)
+    .count(Materialized.as("counts-store"))
+  wordCounts.toStream.to("WordsWithCountsTopic")
+
+  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
+  streams.start()
+
+  sys.ShutdownHookThread {
+    streams.close(10, TimeUnit.SECONDS)
+  }
+}
+              </pre>
+              <p>In the above code snippet, we don't have to provide any SerDes, <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> explicitly. They will also not be dependent on any SerDes specified in the config.  <strong>In fact all SerDes specified in the config will be ignored by the Scala APIs</strong>. All SerDes and <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be handled through implicit SerDes as discussed later in the <a href="#scala-dsl-implicit-serdes">Implicit SerDes</a> section. The complete independence from configuration based SerDes is what makes this library completely typesafe.  Any missing instances of SerDes, <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> will be flagged as a compile time error.</p>
+            </div>
+            <div class="section" id="scala-dsl-implicit-serdes">
+              <span id="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><a class="toc-backref" href="#id29">Implicit SerDes</a><a class="headerlink" href="#scala-dsl-implicit-serdes" title="Permalink to this headline"></a></h3>
+              <p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p>
+              <p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <code class="docutils literal"><span class="pre">Serialized</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> available in scope.</p>
+              <p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope.  A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
+              <p>Here's an example:</p>
+              <pre class="brush: scala;">
+// DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
+// that will set up all Serialized, Produced, Consumed and Joined instances.
+// So all APIs below that accept Serialized, Produced, Consumed or Joined will
+// get these instances automatically
+import DefaultSerdes._
+
+val builder = new StreamsBuilder()
+
+val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+
+val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+
+// The following code fragment does not have a single instance of Serialized,
+// Produced, Consumed or Joined supplied explicitly.
+// All of them are taken care of by the implicit SerDes imported by DefaultSerdes
+val clicksPerRegion: KTable[String, Long] =
+  userClicksStream
+    .leftJoin(userRegionsTable,
+      (clicks: Long, region: String) =>
+        (if (region == null) "UNKNOWN" else region, clicks))
+    .map((_, regionWithClicks) => regionWithClicks)
+    .groupByKey
+    .reduce(_ + _)
+
+clicksPerRegion.toStream.to(outputTopic)
+              </pre>
+              <p>Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:</p>
+              <ol>
+                <li>The code snippet does not depend on any config defined SerDes. In fact any SerDes defined as part of the config will be ignored.</li>
+                <li>All SerDes are picked up from the implicits in scope. And <code class="docutils literal"><span class="pre">import DefaultSerdes._</span></code> brings all necessary SerDes in scope.</li>
+                <li>This is an example of compile time type safety that we don't have in the Java APIs.</li>
+                <li>The code looks less verbose and more focused towards the actual transformation that it does on the data stream.</li>
+              </ol>
+            </div>
+            <div class="section" id="scala-dsl-user-defined-serdes">
+              <span id="streams-developer-guide-dsl-scala-dsl-user-defined-serdes"></span><h3><a class="toc-backref" href="#id30">User-Defined SerDes</a><a class="headerlink" href="#scala-dsl-user-defined-serdes" title="Permalink to this headline"></a></h3>
+              <p>When the default primitive SerDes are not enough and we need to define custom SerDes, the usage is exactly the same as above. Just define the implicit SerDes and start building the stream transformation. Here's an example with <code class="docutils literal"><span class="pre">AvroSerde</span></code>:</p>
+              <pre class="brush: scala;">
+// domain object as a case class
+case class UserClicks(clicks: Long)
+
+// An implicit Serde implementation for the values we want to
+// serialize as avro
+implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
+
+// Primitive SerDes
+import DefaultSerdes._
+
+// And then business as usual ..
+
+val userClicksStream: KStream[String, UserClicks] = builder.stream(userClicksTopic)
+
+val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+
+// Compute the total per region by summing the individual click counts per region.
+val clicksPerRegion: KTable[String, Long] =
+ userClicksStream
+
+   // Join the stream against the table.
+   .leftJoin(userRegionsTable, (clicks: UserClicks, region: String) => (if (region == null) "UNKNOWN" else region, clicks.clicks))
+
+   // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+   .map((_, regionWithClicks) => regionWithClicks)
+
+   // Compute the total per region by summing the individual click counts per region.
+   .groupByKey
+   .reduce(_ + _)
+
+// Write the (continuously updating) results to the output topic.
+clicksPerRegion.toStream.to(outputTopic)
+              </pre>
+              <p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
+            </div>
         </div>
 </div>
 
diff --git a/docs/streams/developer-guide/write-streams.html b/docs/streams/developer-guide/write-streams.html
index 76bd2c371e7..23ca764bb5f 100644
--- a/docs/streams/developer-guide/write-streams.html
+++ b/docs/streams/developer-guide/write-streams.html
@@ -39,13 +39,13 @@
           <li><a class="reference internal" href="#using-kafka-streams-within-your-application-code" id="id2">Using Kafka Streams within your application code</a></li>
           <li><a class="reference internal" href="#testing-a-streams-app" id="id3">Testing a Streams application</a></li>
       </ul>
-    <p>Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application.
+    <p>Any Java or Scala application that makes use of the Kafka Streams library is considered a Kafka Streams application.
       The computational logic of a Kafka Streams application is defined as a <a class="reference internal" href="../core-concepts#streams_topology"><span class="std std-ref">processor topology</span></a>,
       which is a graph of stream processors (nodes) and streams (edges).</p>
     <p>You can define the processor topology with the Kafka Streams APIs:</p>
     <dl class="docutils">
       <dt><a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl"><span class="std std-ref">Kafka Streams DSL</span></a></dt>
-      <dd>A high-level API that provides provides the most common data transformation operations such as <code class="docutils literal"><span class="pre">map</span></code>, <code class="docutils literal"><span class="pre">filter</span></code>, <code class="docutils literal"><span class="pre">join</span></code>, and <code class="docutils literal"><span class="pre">aggregations</span></code> out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.</dd>
+        <dd>A high-level API that provides the most common data transformation operations such as <code class="docutils literal"><span class="pre">map</span></code>, <code class="docutils literal"><span class="pre">filter</span></code>, <code class="docutils literal"><span class="pre">join</span></code>, and <code class="docutils literal"><span class="pre">aggregations</span></code> out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.  If you're writing a Scala application then you can use the <a href="dsl-api.html#scala-dsl"><span class="std std-ref">Kafka Streams DSL for Scala</span></a> library which removes much of the Java/Scala interoperability boilerplate as opposed to working directly with the Java DSL.</a></dd>
       <dt><a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a></dt>
       <dd>A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).</dd>
     </dl>
@@ -69,7 +69,7 @@
               </thead>
               <tbody valign="top">
               <tr class="row-even"><td><code class="docutils literal"><span class="pre">org.apache.kafka</span></code></td>
-                  <td><code>kafka-streams</code></td>
+                  <td><code class="docutils literal"><span class="pre">kafka-streams</span></code></td>
                   <td><code class="docutils literal"><span class="pre">{{fullDotVersion}}</span></code></td>
                   <td>(Required) Base library for Kafka Streams.</td>
               </tr>
@@ -78,6 +78,11 @@
                   <td><code class="docutils literal"><span class="pre">{{fullDotVersion}}</span></code></td>
                   <td>(Required) Kafka client library.  Contains built-in serializers/deserializers.</td>
               </tr>
+              <tr class="row-even"><td><code class="docutils literal"><span class="pre">org.apache.kafka</span></code></td>
+                  <td><code class="docutils literal"><span class="pre">kafka-streams-scala</span></code></td>
+                  <td><code class="docutils literal"><span class="pre">{{fullDotVersion}}</span></code></td>
+                  <td>(Optional) Kafka Streams DSL for Scala library to write Scala Kafka Streams applications.  When not using SBT you will need to suffix the artifact ID with the correct version of Scala your application is using (<code class="docutils literal"><span class="pre">_2.11</code></span>, <code class="docutils literal"><span class="pre">_2.12</code></span>)</td>
+              </tr>
               </tbody>
           </table>
           <div class="admonition tip">
@@ -85,19 +90,24 @@
               <p class="last">See the section <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a> for more information about Serializers/Deserializers.</p>
           </div>
           <p>Example <code class="docutils literal"><span class="pre">pom.xml</span></code> snippet when using Maven:</p>
-          <div class="highlight-xml"><div class="highlight"><pre><span></span><span class="nt">&lt;dependency&gt;</span>
-    <span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
-    <span class="nt">&lt;artifactId&gt;</span>kafka-streams<span class="nt">&lt;/artifactId&gt;</span>
-    <span class="nt">&lt;version&gt;</span>{{fullDotVersion}}<span class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span>
-<span class="nt">&lt;dependency&gt;</span>
-    <span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
-    <span class="nt">&lt;artifactId&gt;</span>kafka-clients<span class="nt">&lt;/artifactId&gt;</span>
-    <span class="nt">&lt;version&gt;</span>{{fullDotVersion}}<span class="nt">&lt;/version&gt;</span>
-<span class="nt">&lt;/dependency&gt;</span>
-              
-</pre></div>
-          </div>
+          <pre class="brush: xml;">
+<dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka-streams</artifactId>
+    <version>{{fullDotVersion}}</version>
+</dependency>
+<dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka-clients</artifactId>
+    <version>{{fullDotVersion}}</version>
+</dependency>
+<!-- Optionally include Kafka Streams DSL for Scala for Scala 2.11 -->
+<dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka-streams-scala_2.11</artifactId>
+    <version>{{fullDotVersion}}</version>
+</dependency>
+          </pre>
       </div>
     <div class="section" id="using-kafka-streams-within-your-application-code">
       <h2>Using Kafka Streams within your application code<a class="headerlink" href="#using-kafka-streams-within-your-application-code" title="Permalink to this headline"></a></h2>
diff --git a/docs/streams/index.html b/docs/streams/index.html
index fe8504d77ff..7ac6867ab24 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -59,7 +59,7 @@ <h3>TOUR OF THE STREAMS API</h3>
                 </div>
             </div>
        </div>
-       <hr class="separator"> 
+       <hr class="separator">
        <div class="use-item-section">
            <div class="use__list__sec">
                <h3>Why you'll love using Kafka Streams!</h3>
@@ -68,18 +68,18 @@ <h3>Why you'll love using Kafka Streams!</h3>
                   <li>Deploy to containers, VMs, bare metal, cloud</li>
                   <li>Equally viable for small, medium, &amp; large use cases</li>
                   <li>Fully integrated with Kafka security</li>
-                  <li>Write standard Java applications</li>
+                  <li>Write standard Java and Scala applications</li>
                   <li>Exactly-once processing semantics</li>
                   <li>No separate processing cluster required</li>
                   <li>Develop on Mac, Linux, Windows</li>
-                  
+
                </ul>
            </div>
            <div class="first__app__cta">
                <a href="/{{version}}/documentation/streams/tutorial" class="first__app__btn">Write your first app</a>
            </div>
        </div>
-       <hr class="separator" id="streams-use-cases"> 
+       <hr class="separator" id="streams-use-cases">
         <h3 class="stream__text">Kafka Streams use cases</h3>
          <div class="customers__grid">
            <div class="customer__grid">
@@ -101,7 +101,7 @@ <h3 class="stream__text">Kafka Streams use cases</h3>
                  <a href="https://www.confluent.io/blog/ranking-websites-real-time-apache-kafkas-streams-api/" target='blank'> event streams</a> enables our technical team to do near-real time business intelligence.
                </p>
            </div>
-           </div>  
+           </div>
            <div class="customer__grid">
              <div class="customer__item  streams_logo_grid streams__line__grid">
                <a href="https://engineering.linecorp.com/en/blog/detail/80" target="_blank" class="grid__logo__link">
@@ -119,7 +119,7 @@ <h3 class="stream__text">Kafka Streams use cases</h3>
                    <a href="https://medium.com/@Pinterest_Engineering/using-kafka-streams-api-for-predictive-budgeting-9f58d206c996" target="_blank">Pinterest uses Apache Kafka and the Kafka Streams</a> at large scale to power the real-time, predictive budgeting system of their advertising infrastructure. With Kafka Streams, spend predictions are more accurate than ever.
                </p>
              </div>
-           </div> 
+           </div>
            <div class="customer__grid">
              <div class="customer__item  streams_logo_grid streams__rabobank__grid">
                <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank" class="grid__logo__link">
@@ -127,7 +127,7 @@ <h3 class="stream__text">Kafka Streams use cases</h3>
                </a>
                  <p class="grid__item__customer__description">Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real-time upon financial events and is <a href="https://www.confluent.io/blog/real-time-financial-alerts-rabobank-apache-kafkas-streams-api/" target="_blank">built using Kafka Streams.</a></p>
              </div>
-           </div>        
+           </div>
            <div class="customer__grid">
              <div class="customer__item streams_logo_grid streams__ny__grid">
                <a href="https://speakerdeck.com/xenji/kafka-and-debezium-at-trivago-code-dot-talks-2017-edition" target="_blank" class="grid__logo__link">
@@ -137,19 +137,19 @@ <h3 class="stream__text">Kafka Streams use cases</h3>
                    Trivago is a global hotel search platform. We are focused on reshaping the way travelers search for and compare hotels, while enabling hotel advertisers to grow their businesses by providing access to a broad audience of travelers via our websites and apps. As of 2017, we offer access to approximately 1.8 million hotels and other accommodations in over 190 countries. We use Kafka, Kafka Connect, and Kafka Streams to <a href="https://speakerdeck.com/xenji/kafka-and-debezium-at-trivago-code-dot-talks-2017-edition" target="_blank">enable our developers</a> to access data freely in the company. Kafka Streams powers parts of our analytics pipeline and delivers endless options to explore and operate on the data sources we have at hand.
                </p>
              </div>
-           </div>  
-         
+           </div>
+
        </div>
        <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
        <p>The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale</p>
-       
+
        <div class="code-example">
            <div class="btn-group">
                <a class="selected b-java-8" data-section="java-8">Java 8+</a>
                <a class="b-java-7" data-section="java-7">Java 7</a>
                <a class="b-scala" data-section="scala">Scala</a>
            </div>
-       
+
            <div class="code-example__snippet b-java-8 selected">
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
@@ -190,7 +190,7 @@ <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
                    }
                </pre>
            </div>
-       
+
            <div class="code-example__snippet b-java-7">
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
@@ -208,16 +208,16 @@ <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
 
                    import java.util.Arrays;
                    import java.util.Properties;
-       
+
                    public class WordCountApplication {
-       
+
                        public static void main(final String[] args) throws Exception {
                            Properties config = new Properties();
                            config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                            config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                            config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                            config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
+
                            StreamsBuilder builder = new StreamsBuilder();
                            KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
                            KTable&lt;String, Long&gt; wordCounts = textLines
@@ -237,67 +237,59 @@ <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
 
 
                            wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
-       
+
                            KafkaStreams streams = new KafkaStreams(builder.build(), config);
                            streams.start();
                        }
-       
+
                    }
                </pre>
            </div>
-       
+
            <div class="code-example__snippet b-scala">
                <pre class="brush: scala;">
-                   import java.lang.Long
-                   import java.util.Properties
-                   import java.util.concurrent.TimeUnit
-
-                   import org.apache.kafka.common.serialization._
-                   import org.apache.kafka.common.utils.Bytes
-                   import org.apache.kafka.streams._
-                   import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
-                   import org.apache.kafka.streams.state.KeyValueStore
-
-                   import scala.collection.JavaConverters.asJavaIterableConverter
-
-                   object WordCountApplication {
-
-                       def main(args: Array[String]) {
-                           val config: Properties = {
-                               val p = new Properties()
-                               p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
-                               p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
-                               p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
-                               p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
-                               p
-                           }
-
-                           val builder: StreamsBuilder = new StreamsBuilder()
-                           val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
-                           val wordCounts: KTable[String, Long] = textLines
-                               .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
-                               .groupBy((_, word) => word)
-                               .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
-                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long()))
-
-                           val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
-                           streams.start()
-
-                           Runtime.getRuntime.addShutdownHook(new Thread(() => {
-                               streams.close(10, TimeUnit.SECONDS)
-                           }))
-                       }
+import java.util.Properties
+import java.util.concurrent.TimeUnit
 
-                   }
+import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
+
+object WordCountApplication extends App {
+  import DefaultSerdes._
+  import ImplicitConversions._
+
+  val config: Properties = {
+    val p = new Properties()
+    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
+    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
+    p
+  }
+
+  val builder: StreamsBuilder = new StreamsBuilder()
+  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
+  val wordCounts: KTable[String, Long] = textLines
+    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
+    .groupBy((_, word) => word)
+    .count(Materialized.as("counts-store"))
+  wordCounts.toStream.to("WordsWithCountsTopic")
+
+  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
+  streams.start()
+
+  sys.ShutdownHookThread {
+    streams.close(10, TimeUnit.SECONDS)
+  }
+}
                </pre>
            </div>
        </div>
-       
+
        <div class="pagination">
            <a href="/{{version}}/documentation" class="pagination__btn pagination__btn__prev">Previous</a>
            <a href="/{{version}}/documentation/streams/quickstart" class="pagination__btn pagination__btn__next">Next</a>
        </div>
-     
+
 </script>
 <!--#include virtual="../../includes/_header.htm" -->
 <!--#include virtual="../../includes/_top.htm" -->
@@ -316,52 +308,52 @@ <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
 <!--#include virtual="../../includes/_footer.htm" -->
 <script>
   $(function() {
-         
+
          // Show selected style on nav item
          $('.b-nav__streams').addClass('selected');
-    
-         $('.video_list_1').click(function(){    
+
+         $('.video_list_1').click(function(){
              $('.video_2').attr('src', $('.video_2').attr('src'));
              $('.video_3').attr('src', $('.video_3').attr('src'));
              $('.video_4').attr('src', $('.video_4').attr('src'));
 
            });
 
-         $('.video_list_2').click(function(){    
+         $('.video_list_2').click(function(){
                $('.video_1').attr('src', $('.video_1').attr('src'));
                $('.video_3').attr('src', $('.video_3').attr('src'));
                $('.video_4').attr('src', $('.video_4').attr('src'));
 
            });
 
-         $('.video_list_3').click(function(){    
+         $('.video_list_3').click(function(){
               $('.video_1').attr('src', $('.video_1').attr('src'));
               $('.video_2').attr('src', $('.video_2').attr('src'));
               $('.video_4').attr('src', $('.video_4').attr('src'));
            });
 
-         $('.video_list_4').click(function(){    
+         $('.video_list_4').click(function(){
               $('.video_1').attr('src', $('.video_1').attr('src'));
               $('.video_2').attr('src', $('.video_2').attr('src'));
               $('.video_3').attr('src', $('.video_3').attr('src'));
            });
-           
+
 
           //sticky secondary nav
           var $navbar = $(".sub-nav-sticky"),
                y_pos = $navbar.offset().top,
                height = $navbar.height();
-       
+
            $(window).scroll(function() {
                var scrollTop = $(window).scrollTop();
-           
+
                if (scrollTop > y_pos - height) {
                    $navbar.addClass("navbar-fixed")
                } else if (scrollTop <= y_pos) {
                    $navbar.removeClass("navbar-fixed")
                }
            });
-       
+
          // Display docs subnav items
          $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
          // Show selected code example
@@ -371,4 +363,4 @@ <h3 style="margin-top: 5.3rem;">Hello Kafka Streams</h3>
              $(targetClass).addClass('selected');
          });
        });
-</script>
\ No newline at end of file
+</script>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index fdc0af1de1f..7ffafb547a8 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -112,9 +112,8 @@ <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API
         In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
         to let users specify inner serdes if the default serde classes are windowed serdes.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
-    /<p>
-
-     <p>
+    </p>
+    <p>
         We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter.
         Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
@@ -127,6 +126,14 @@ <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API
       Forwarding based on child index is not supported in the new API any longer.
     </p>
 
+    <p>
+      Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala.  It wraps core Kafka Streams DSL types to make it easier to call when
+      interoperating with Scala code.  For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way
+      to implicitly provide SerDes to reduce boilerplate from your application and make it more typesafe, and more!  For more information see the
+      <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka Streams DSL for Scala documentation</a> and
+      <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams">KIP-270</a>.
+    </p>
+
     <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
     <p>
         We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index d6beba958dd..0d05da37648 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -60,6 +60,7 @@ versions += [
   log4j: "1.2.17",
   scalaLogging: "3.8.0",
   jaxb: "2.3.0",
+  jfreechart: "1.0.0",
   jopt: "5.0.4",
   junit: "4.12",
   kafka_0100: "0.10.0.1",
@@ -68,6 +69,7 @@ versions += [
   kafka_0110: "0.11.0.2",
   kafka_10: "1.0.1",
   lz4: "1.4.1",
+  mavenArtifact: "3.5.3",
   metrics: "2.2.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
   powermock: "2.0.0-beta.5",
@@ -78,14 +80,11 @@ versions += [
   slf4j: "1.7.25",
   snappy: "1.1.7.1",
   zkclient: "0.10",
-  zookeeper: "3.4.10",
-  jfreechart: "1.0.0",
-  mavenArtifact: "3.5.3"
+  zookeeper: "3.4.10"
 ]
 
 libs += [
   activation: "javax.activation:activation:$versions.activation",
-  argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
   apacheda: "org.apache.directory.api:api-all:$versions.apacheda",
   apachedsCoreApi: "org.apache.directory.server:apacheds-core-api:$versions.apacheds",
   apachedsInterceptorKerberos: "org.apache.directory.server:apacheds-interceptor-kerberos:$versions.apacheds",
@@ -95,6 +94,7 @@ libs += [
   apachedsLdifPartition: "org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds",
   apachedsMavibotPartition: "org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
   apachedsJdbmPartition: "org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
+  argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
@@ -105,6 +105,7 @@ libs += [
   jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
   jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
   jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
+  jfreechart: "1.0.0",
   jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
   jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 33702056d83..f36f32376ab 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -305,4 +305,19 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
         <Source name="RoundTripWorker.java"/>
         <Bug pattern="NN_NAKED_NOTIFY"/>
     </Match>
+
+    <Match>
+        <Package name="org.apache.kafka.streams.scala"/>
+        <Source name="FunctionConversions.scala"/>
+        <Bug pattern="EQ_UNUSUAL"/>
+    </Match>
+
+-    <Match>
+        <Package name="org.apache.kafka.streams.scala"/>
+        <Or>
+            <Class name="org.apache.kafka.streams.scala.Serializer" />
+            <Class name="org.apache.kafka.streams.scala.Deserializer" />
+        </Or>
+        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
+    </Match>
 </FindBugsFilter>
diff --git a/settings.gradle b/settings.gradle
index 03136849fd5..19b3ef9f415 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples',
         'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
         'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender',
         'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/streams-scala/.gitignore b/streams/streams-scala/.gitignore
new file mode 100644
index 00000000000..bf11921177e
--- /dev/null
+++ b/streams/streams-scala/.gitignore
@@ -0,0 +1 @@
+/logs/
\ No newline at end of file
diff --git a/streams/streams-scala/NOTICE b/streams/streams-scala/NOTICE
new file mode 100644
index 00000000000..732e373e1c1
--- /dev/null
+++ b/streams/streams-scala/NOTICE
@@ -0,0 +1,3 @@
+Kafka Streams Scala
+Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+Copyright (C) 2017-2018 Alexis Seigneurin.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
new file mode 100644
index 00000000000..864fd19f87a
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams
+
+import org.apache.kafka.streams.state.KeyValueStore
+import org.apache.kafka.common.utils.Bytes
+
+package object scala {
+  type ByteArrayKeyValueStore = KeyValueStore[Bytes, Array[Byte]]
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
new file mode 100644
index 00000000000..3f2840eeecb
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.nio.ByteBuffer
+
+import org.apache.kafka.common.serialization.{Serde, Serdes}
+import org.apache.kafka.common.utils.Bytes
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+
+/**
+ * Implicit values for default serdes.
+ * <p>
+ * Bring them in scope for default serializers / de-serializers to work.
+ */
+object DefaultSerdes {
+  implicit val stringSerde: Serde[String] = Serdes.String()
+  implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
+  implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
+  implicit val bytesSerde: Serde[Bytes] = Serdes.Bytes()
+  implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
+  implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
+  implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
+  implicit val shortSerde: Serde[Short] = Serdes.Short().asInstanceOf[Serde[Short]]
+  implicit val byteBufferSerde: Serde[ByteBuffer] = Serdes.ByteBuffer()
+
+  implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
+  implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
new file mode 100644
index 00000000000..9ce9838c7c9
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.KeyValue
+import org.apache.kafka.streams.kstream._
+import scala.collection.JavaConverters._
+import java.lang.{Iterable => JIterable}
+
+/**
+ * Implicit classes that offer conversions of Scala function literals to
+ * SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
+ * more expressive, with less boilerplate and more succinct.
+ * <p>
+ * For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
+ * have full support for SAM types. 
+ */
+object FunctionConversions {
+
+  implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
+    def asPredicate: Predicate[K, V] = new Predicate[K, V] {
+      override def test(key: K, value: V): Boolean = p(key, value)
+    }
+  }
+
+  implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
+    def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
+      override def apply(key: T, value: U): VR = f(key, value)
+    }
+    def asValueJoiner: ValueJoiner[T, U, VR] = new ValueJoiner[T, U, VR] {
+      override def apply(value1: T, value2: U): VR = f(value1, value2)
+    }
+  }
+
+  implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
+    def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
+      override def apply(key: K, value: V): KeyValue[KR, VR] = {
+        val (kr, vr) = f(key, value)
+        KeyValue.pair(kr, vr)
+      }
+    }
+  }
+
+  implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
+    def asValueMapper: ValueMapper[V, VR] = new ValueMapper[V, VR] {
+      override def apply(value: V): VR = f(value)
+    }
+  }
+
+  implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal {
+    def asValueMapper: ValueMapper[V, JIterable[VR]] = new ValueMapper[V, JIterable[VR]] {
+      override def apply(value: V): JIterable[VR] = f(value).asJava
+    }
+  }
+
+  implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal {
+    def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = new ValueMapperWithKey[K, V, VR] {
+      override def apply(readOnlyKey: K, value: V): VR = f(readOnlyKey, value)
+    }
+  }
+
+  implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal {
+    def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] = new ValueMapperWithKey[K, V, JIterable[VR]] {
+      override def apply(readOnlyKey: K, value: V): JIterable[VR] = f(readOnlyKey, value).asJava
+    }
+  }
+
+  implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal {
+    def asAggregator: Aggregator[K, V, VA] = new Aggregator[K, V, VA] {
+      override def apply(key: K, value: V, aggregate: VA): VA = f(key, value, aggregate)
+    }
+  }
+
+  implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
+    def asMerger: Merger[K, VR] = new Merger[K, VR] {
+      override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
+    }
+  }
+
+  implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal {
+    def asReducer: Reducer[V] = new Reducer[V] {
+      override def apply(value1: V, value2: V): V = f(value1, value2)
+    }
+  }
+
+  implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal {
+    def asInitializer: Initializer[VA] = new Initializer[VA] {
+      override def apply(): VA = f()
+    }
+  }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
new file mode 100644
index 00000000000..000690ba0ab
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.{KStream => KStreamJ,
+  KTable => KTableJ,
+  KGroupedStream => KGroupedStreamJ,
+  SessionWindowedKStream => SessionWindowedKStreamJ,
+  TimeWindowedKStream => TimeWindowedKStreamJ,
+  KGroupedTable => KGroupedTableJ, _}
+
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.{KeyValue, Consumed}
+import org.apache.kafka.common.serialization.Serde
+
+import scala.language.implicitConversions
+
+/**
+ * Implicit conversions between the Scala wrapper objects and the underlying Java
+ * objects.
+ */
+object ImplicitConversions {
+
+  implicit def wrapKStream[K, V](inner: KStreamJ[K, V]): KStream[K, V] =
+    new KStream[K, V](inner)
+
+  implicit def wrapKGroupedStream[K, V](inner: KGroupedStreamJ[K, V]): KGroupedStream[K, V] =
+    new KGroupedStream[K, V](inner)
+
+  implicit def wrapSessionWindowedKStream[K, V](inner: SessionWindowedKStreamJ[K, V]): SessionWindowedKStream[K, V] =
+    new SessionWindowedKStream[K, V](inner)
+
+  implicit def wrapTimeWindowedKStream[K, V](inner: TimeWindowedKStreamJ[K, V]): TimeWindowedKStream[K, V] =
+    new TimeWindowedKStream[K, V](inner)
+
+  implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
+    new KTable[K, V](inner)
+
+  implicit def wrapKGroupedTable[K, V](inner: KGroupedTableJ[K, V]): KGroupedTable[K, V] =
+    new KGroupedTable[K, V](inner)
+
+  implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
+
+  // we would also like to allow users implicit serdes
+  // and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
+
+  implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
+    Serialized.`with`(keySerde, valueSerde)
+
+  implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
+    Consumed.`with`(keySerde, valueSerde)
+
+  implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
+    Produced.`with`(keySerde, valueSerde)
+
+  implicit def joinedFromKeyValueOtherSerde[K, V, VO]
+    (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
+    Joined.`with`(keySerde, valueSerde, otherValueSerde)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
new file mode 100644
index 00000000000..06afcaeed5e
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.common.serialization.{Serde, Deserializer => JDeserializer, Serializer => JSerializer}
+
+trait ScalaSerde[T] extends Serde[T] {
+  override def deserializer(): JDeserializer[T]
+
+  override def serializer(): JSerializer[T]
+
+  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
+
+  override def close(): Unit = ()
+}
+
+trait SimpleScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] {
+  def serialize(data: T): Array[Byte]
+  def deserialize(data: Array[Byte]): Option[T]
+
+  private def outerSerialize(data: T): Array[Byte] = serialize(data)
+  private def outerDeserialize(data: Array[Byte]): Option[T] = deserialize(data)
+
+  override def deserializer(): Deserializer[T] = new Deserializer[T] {
+    override def deserialize(data: Array[Byte]): Option[T] = outerDeserialize(data)
+  }
+
+  override def serializer(): Serializer[T] = new Serializer[T] {
+    override def serialize(data: T): Array[Byte] = outerSerialize(data)
+  }
+}
+
+trait Deserializer[T >: Null] extends JDeserializer[T] {
+  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
+
+  override def close(): Unit = ()
+
+  override def deserialize(topic: String, data: Array[Byte]): T =
+    Option(data).flatMap(deserialize).orNull
+
+  def deserialize(data: Array[Byte]): Option[T]
+}
+
+trait Serializer[T] extends JSerializer[T] {
+  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
+
+  override def close(): Unit = ()
+
+  override def serialize(topic: String, data: T): Array[Byte] =
+    Option(data).map(serialize).orNull
+
+  def serialize(data: T): Array[Byte]
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
new file mode 100644
index 00000000000..9e6e204a372
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.regex.Pattern
+
+import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
+import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
+import org.apache.kafka.streams.state.StoreBuilder
+import org.apache.kafka.streams.{Consumed, StreamsBuilder => StreamsBuilderJ, Topology}
+
+import org.apache.kafka.streams.scala.kstream._
+import ImplicitConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
+  */
+class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
+
+  /**
+   * Create a [[kstream.KStream]] from the specified topic.
+   * <p>
+   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, 
+   * key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
+   * <p>
+   * A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
+   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. 
+   * {{{
+   * // Brings all implicit conversions in scope
+   * import ImplicitConversions._
+   *
+   * // Bring implicit default serdes in scope
+   * import DefaultSerdes._
+   *
+   * val builder = new StreamsBuilder()
+   *
+   * // stream function gets the implicit Consumed which is constructed automatically
+   * // from the serdes through the implicits in ImplicitConversions#consumedFromSerde
+   * val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+   * }}}
+   *
+   * @param topic the topic name
+   * @return a [[kstream.KStream]] for the specified topic
+   */
+  def stream[K, V](topic: String)(implicit consumed: Consumed[K, V]): KStream[K, V] =
+    inner.stream[K, V](topic, consumed)
+
+  /**
+   * Create a [[kstream.KStream]] from the specified topics.
+   *
+   * @param topics the topic names
+   * @return a [[kstream.KStream]] for the specified topics
+   * @see #stream(String)
+   * @see `org.apache.kafka.streams.StreamsBuilder#stream`
+   */
+  def stream[K, V](topics: List[String])(implicit consumed: Consumed[K, V]): KStream[K, V] =
+    inner.stream[K, V](topics.asJava, consumed)
+
+  /**
+   * Create a [[kstream.KStream]] from the specified topic pattern.
+   *
+   * @param topics the topic name pattern
+   * @return a [[kstream.KStream]] for the specified topics
+   * @see #stream(String)
+   * @see `org.apache.kafka.streams.StreamsBuilder#stream`
+   */
+  def stream[K, V](topicPattern: Pattern)(implicit consumed: Consumed[K, V]): KStream[K, V] =
+    inner.stream[K, V](topicPattern, consumed)
+
+  /**
+   * Create a [[kstream.KTable]] from the specified topic.
+   * <p>
+   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, 
+   * key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
+   * <p>
+   * A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
+   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. 
+   * {{{
+   * // Brings all implicit conversions in scope
+   * import ImplicitConversions._
+   *
+   * // Bring implicit default serdes in scope
+   * import DefaultSerdes._
+   *
+   * val builder = new StreamsBuilder()
+   *
+   * // stream function gets the implicit Consumed which is constructed automatically
+   * // from the serdes through the implicits in ImplicitConversions#consumedFromSerde
+   * val userClicksStream: KTable[String, Long] = builder.table(userClicksTopic)
+   * }}}
+   *
+   * @param topic the topic name
+   * @return a [[kstream.KTable]] for the specified topic
+   * @see `org.apache.kafka.streams.StreamsBuilder#table`
+   */
+  def table[K, V](topic: String)(implicit consumed: Consumed[K, V]): KTable[K, V] =
+    inner.table[K, V](topic, consumed)
+
+  /**
+   * Create a [[kstream.KTable]] from the specified topic.
+   *
+   * @param topic the topic name
+   * @param materialized  the instance of `Materialized` used to materialize a state store
+   * @return a [[kstream.KTable]] for the specified topic
+   * @see #table(String)
+   * @see `org.apache.kafka.streams.StreamsBuilder#table`
+   */
+  def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
+    (implicit consumed: Consumed[K, V]): KTable[K, V] =
+    inner.table[K, V](topic, consumed, materialized)
+
+  /**
+   * Create a `GlobalKTable` from the specified topic. The serializers from the implicit `Consumed`
+   * instance will be used. Input records with `null` key will be dropped.
+   *
+   * @param topic the topic name
+   * @return a `GlobalKTable` for the specified topic
+   * @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
+   */
+  def globalTable[K, V](topic: String)(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
+    inner.globalTable(topic, consumed)
+
+  /**
+   * Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized 
+   * in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers 
+   * from the implicit `Consumed` instance will be used.
+   *
+   * @param topic the topic name
+   * @param materialized  the instance of `Materialized` used to materialize a state store
+   * @return a `GlobalKTable` for the specified topic
+   * @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
+   */
+  def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
+    (implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
+    inner.globalTable(topic, consumed, materialized)
+
+  /**
+   * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`, 
+   * `Transformer`, or `ValueTransformer` before it can be used.
+   *
+   * @param builder the builder used to obtain this state store `StateStore` instance
+   * @return the underlying Java abstraction `StreamsBuilder` after adding the `StateStore`
+   * @throws TopologyException if state store supplier is already added
+   * @see `org.apache.kafka.streams.StreamsBuilder#addStateStore`
+   */
+  def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
+
+  /**
+   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`, 
+   * or `ValueTransformer` (in contrast to regular stores).
+   *
+   * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
+   */ 
+  def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
+                     topic: String,
+                     consumed: Consumed[_, _],
+                     stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilderJ =
+    inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
+
+  def build(): Topology = inner.build()
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
new file mode 100644
index 00000000000..8f0ae93c149
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -0,0 +1,145 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+
+/**
+ * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for KGroupedStream
+ *
+ * @see `org.apache.kafka.streams.kstream.KGroupedStream`
+ */
+class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
+
+  /**
+   * Count the number of records in this stream by the grouped key.
+   *
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
+   */ 
+  def count(): KTable[K, Long] = {
+    val c: KTable[K, java.lang.Long] = inner.count()
+    c.mapValues[Long](Long2long _)
+  }
+
+  /**
+   * Count the number of records in this stream by the grouped key.
+   * The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
+   * provided by the given `materialized`.
+   *
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
+   */ 
+  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { 
+    val c: KTable[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+    c.mapValues[Long](Long2long _)
+  }
+
+  /**
+   * Combine the values of records in this stream by the grouped key.
+   *
+   * @param reducer   a function `(V, V) => V` that computes a new aggregate result. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
+   */ 
+  def reduce(reducer: (V, V) => V): KTable[K, V] = {
+    inner.reduce(reducer.asReducer)
+  }
+
+  /**
+   * Combine the values of records in this stream by the grouped key.
+   *
+   * @param reducer   a function `(V, V) => V` that computes a new aggregate result. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
+   */ 
+  def reduce(reducer: (V, V) => V,
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+
+    // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
+    // works perfectly with Scala 2.12 though
+    inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized)
+  }
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key.
+   *
+   * @param initializer   an `Initializer` that computes an initial intermediate aggregation result
+   * @param aggregator    an `Aggregator` that computes a new aggregate result
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
+   */ 
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR): KTable[K, VR] = {
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  }
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key.
+   *
+   * @param initializer   an `Initializer` that computes an initial intermediate aggregation result
+   * @param aggregator    an `Aggregator` that computes a new aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
+   */ 
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+  }
+
+  /**
+   * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
+   *
+   * @param windows the specification of the aggregation `SessionWindows`
+   * @return an instance of [[SessionWindowedKStream]]
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#windowedBy`
+   */
+  def windowedBy(windows: SessionWindows): SessionWindowedKStream[K, V] =
+    inner.windowedBy(windows)
+
+  /**
+   * Create a new [[TimeWindowedKStream]] instance that can be used to perform windowed aggregations.
+   *
+   * @param windows the specification of the aggregation `Windows`
+   * @return an instance of [[TimeWindowedKStream]]
+   * @see `org.apache.kafka.streams.kstream.KGroupedStream#windowedBy`
+   */
+  def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedKStream[K, V] =
+    inner.windowedBy(windows)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
new file mode 100644
index 00000000000..57c44fc8033
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.kstream.{KGroupedTable => KGroupedTableJ, _}
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+/**
+ * Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for KGroupedTable
+ *
+ * @see `org.apache.kafka.streams.kstream.KGroupedTable`
+ */
+class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
+
+  /**
+   * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
+   * the same key into a new instance of [[KTable]].
+   *
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
+   */ 
+  def count(): KTable[K, Long] = {
+    val c: KTable[K, java.lang.Long] = inner.count()
+    c.mapValues[Long](Long2long(_))
+  }
+
+  /**
+   * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
+   * the same key into a new instance of [[KTable]].
+   *
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
+   */ 
+  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] =
+    inner.count(materialized)
+
+  /**
+   * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
+   * to the same key into a new instance of [[KTable]].
+   *
+   * @param adder      a function that adds a new value to the aggregate result
+   * @param subtractor a function that removed an old value from the aggregate result
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
+   */
+  def reduce(adder: (V, V) => V,
+             subtractor: (V, V) => V): KTable[K, V] = {
+
+    // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
+    // works perfectly with Scala 2.12 though
+    inner.reduce(adder.asReducer, subtractor.asReducer)
+  }
+
+  /**
+   * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
+   * to the same key into a new instance of [[KTable]].
+   *
+   * @param adder      a function that adds a new value to the aggregate result
+   * @param subtractor a function that removed an old value from the aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
+   */
+  def reduce(adder: (V, V) => V,
+             subtractor: (V, V) => V,
+             materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+
+    // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
+    // works perfectly with Scala 2.12 though
+    inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
+  }
+
+  /**
+   * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
+   * to the same key into a new instance of [[KTable]] using default serializers and deserializers.
+   *
+   * @param initializer a function that provides an initial aggregate result value
+   * @param adder       a function that adds a new record to the aggregate result
+   * @param subtractor  an aggregator function that removed an old record from the aggregate result
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+                    adder: (K, V, VR) => VR,
+                    subtractor: (K, V, VR) => VR): KTable[K, VR] = {
+
+    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
+  }
+
+  /**
+   * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
+   * to the same key into a new instance of [[KTable]] using default serializers and deserializers.
+   *
+   * @param initializer a function that provides an initial aggregate result value
+   * @param adder       a function that adds a new record to the aggregate result
+   * @param subtractor  an aggregator function that removed an old record from the aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+                    adder: (K, V, VR) => VR,
+                    subtractor: (K, V, VR) => VR,
+                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+
+    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
+  }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
new file mode 100644
index 00000000000..94c36add172
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -0,0 +1,581 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.KeyValue
+import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier}
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+import scala.collection.JavaConverters._
+
+/**
+ * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for KStream
+ *
+ * @see `org.apache.kafka.streams.kstream.KStream`
+ */
+class KStream[K, V](val inner: KStreamJ[K, V]) {
+
+  /**
+   * Create a new [[KStream]] that consists all records of this stream which satisfies the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @return a [[KStream]] that contains only those records that satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KStream#filter`
+   */ 
+  def filter(predicate: (K, V) => Boolean): KStream[K, V] = {
+    inner.filter(predicate.asPredicate)
+  }
+
+  /**
+   * Create a new [[KStream]] that consists all records of this stream which do <em>not</em> satisfy the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KStream#filterNot`
+   */ 
+  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] = {
+    inner.filterNot(predicate.asPredicate)
+  }
+
+  /**
+   * Set a new key (with possibly new type) for each input record.
+   * <p>
+   * The function `mapper` passed is applied to every record and results in the generation of a new
+   * key `KR`. The function outputs a new [[KStream]] where each record has this new key.
+   *
+   * @param mapper a function `(K, V) => KR` that computes a new key for each record
+   * @return a [[KStream]] that contains records with new key (possibly of different type) and unmodified value
+   * @see `org.apache.kafka.streams.kstream.KStream#selectKey`
+   */ 
+  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+    inner.selectKey[KR](mapper.asKeyValueMapper)
+  }
+
+  /**
+   * Transform each record of the input stream into a new record in the output stream (both key and value type can be
+   * altered arbitrarily).
+   * <p>
+   * The provided `mapper`, a function `(K, V) => (KR, VR)` is applied to each input record and computes a new output record.
+   *
+   * @param mapper a function `(K, V) => (KR, VR)` that computes a new output record
+   * @return a [[KStream]] that contains records with new key and value (possibly both of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#map`
+   */ 
+  def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
+    val kvMapper = mapper.tupled andThen tuple2ToKeyValue
+    inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
+  }
+
+  /**
+   * Transform the value of each input record into a new value (with possible new type) of the output record.
+   * <p>
+   * The provided `mapper`, a function `V => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `V => VR` that computes a new output value
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
+   */ 
+  def mapValues[VR](mapper: V => VR): KStream[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapper)
+  }
+
+  /**
+   * Transform the value of each input record into a new value (with possible new type) of the output record.
+   * <p>
+   * The provided `mapper`, a function `(K, V) => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `(K, V) => VR` that computes a new output value
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
+   */ 
+  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapperWithKey)
+  }
+
+  /**
+   * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+   * can be altered arbitrarily).
+   * <p>
+   * The provided `mapper`, function `(K, V) => Iterable[(KR, VR)]` is applied to each input record and computes zero or more output records.
+   *
+   * @param mapper function `(K, V) => Iterable[(KR, VR)]` that computes the new output records
+   * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#flatMap`
+   */ 
+  def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = {
+    val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava)
+    inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k , v)).asKeyValueMapper)
+  }
+
+  /**
+   * Create a new [[KStream]] by transforming the value of each record in this stream into zero or more values
+   * with the same key in the new stream.
+   * <p>
+   * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+   * stream (value type can be altered arbitrarily).
+   * The provided `mapper`, a function `V => Iterable[VR]` is applied to each input record and computes zero or more output values.
+   *
+   * @param mapper a function `V => Iterable[VR]` that computes the new output values
+   * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
+   * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
+   */ 
+  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] = {
+    inner.flatMapValues[VR](mapper.asValueMapper)
+  }
+
+  /**
+   * Create a new [[KStream]] by transforming the value of each record in this stream into zero or more values
+   * with the same key in the new stream.
+   * <p>
+   * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+   * stream (value type can be altered arbitrarily).
+   * The provided `mapper`, a function `(K, V) => Iterable[VR]` is applied to each input record and computes zero or more output values.
+   *
+   * @param mapper a function `(K, V) => Iterable[VR]` that computes the new output values
+   * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
+   * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
+   */ 
+  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] = {
+    inner.flatMapValues[VR](mapper.asValueMapperWithKey)
+  }
+
+  /**
+   * Print the records of this KStream using the options provided by `Printed`
+   *
+   * @param printed options for printing
+   * @see `org.apache.kafka.streams.kstream.KStream#print`
+   */
+  def print(printed: Printed[K, V]): Unit = inner.print(printed)
+
+  /**
+   * Perform an action on each record of 'KStream`
+   *
+   * @param action an action to perform on each record
+   * @see `org.apache.kafka.streams.kstream.KStream#foreach`
+   */
+  def foreach(action: (K, V) => Unit): Unit = {
+    inner.foreach((k: K, v: V) => action(k, v))
+  }
+
+  /**
+   * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
+   * the supplied predicates.
+   *
+   * @param predicates the ordered list of functions that return a Boolean
+   * @return multiple distinct substreams of this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#branch`
+   */
+  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = {
+    inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
+  }
+
+  /**
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for 
+   * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
+   * <p>
+   * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Produced` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import DefaultSerdes._
+   *
+   * //..
+   * val clicksPerRegion: KTable[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.through(topic)
+   *
+   * // Similarly you can create an implicit Produced and it will be passed implicitly
+   * // to the through call
+   * }}}
+   *
+   * @param topic the topic name
+   * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
+   * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#through`
+   */
+  def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
+    inner.through(topic, produced)
+
+  /**
+   * Materialize this stream to a topic using the `Produced` instance for 
+   * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
+   * <p>
+   * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Produced` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import DefaultSerdes._
+   *
+   * //..
+   * val clicksPerRegion: KTable[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.to(topic)
+   *
+   * // Similarly you can create an implicit Produced and it will be passed implicitly
+   * // to the through call
+   * }}}
+   *
+   * @param topic the topic name
+   * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
+   * @see `org.apache.kafka.streams.kstream.KStream#to`
+   */
+  def to(topic: String)(implicit produced: Produced[K, V]): Unit =
+    inner.to(topic, produced)
+
+  /**
+   * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+   * can be altered arbitrarily).
+   * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record and
+   * computes zero or more output records. In order to assign a state, the state must be created and registered 
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   *
+   * @param transformerSupplier a instance of `TransformerSupplier` that generates a `Transformer`
+   * @param stateStoreNames     the names of the state stores used by the processor
+   * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transform`
+   */ 
+  def transform[K1, V1](transformerSupplier: Transformer[K, V, (K1, V1)],
+    stateStoreNames: String*): KStream[K1, V1] = {
+    val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] {
+      override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
+        new Transformer[K, V, KeyValue[K1, V1]] {
+          override def transform(key: K, value: V): KeyValue[K1, V1] = {
+            transformerSupplier.transform(key, value) match {
+              case (k1, v1) => KeyValue.pair(k1, v1)
+              case _ => null
+            }
+          }
+
+          override def init(context: ProcessorContext): Unit = transformerSupplier.init(context)
+
+          @deprecated ("Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead", "0.1.3") // scalastyle:ignore
+          override def punctuate(timestamp: Long): KeyValue[K1, V1] = {
+            transformerSupplier.punctuate(timestamp) match {
+              case (k1, v1) => KeyValue.pair[K1, V1](k1, v1)
+              case _ => null
+            }
+          }
+
+          override def close(): Unit = transformerSupplier.close()
+        }
+      }
+    }
+    inner.transform(transformerSupplierJ, stateStoreNames: _*)
+  }
+
+  /**
+   * Transform the value of each input record into a new value (with possible new type) of the output record.
+   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * In order to assign a state, the state must be created and registered 
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   *
+   * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */ 
+  def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
+                          stateStoreNames: String*): KStream[K, VR] = {
+    inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
+  }
+
+  /**
+   * Transform the value of each input record into a new value (with possible new type) of the output record.
+   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * In order to assign a state, the state must be created and registered 
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   *
+   * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */ 
+  def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+                          stateStoreNames: String*): KStream[K, VR] = {
+    inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
+  }
+
+  /**
+   * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
+   * `processorSupplier`).
+   * In order to assign a state, the state must be created and registered 
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   *
+   * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]]
+   * @param stateStoreNames   the names of the state store used by the processor
+   * @see `org.apache.kafka.streams.kstream.KStream#process`
+   */ 
+  def process(processorSupplier: () => Processor[K, V],
+    stateStoreNames: String*): Unit = {
+
+    val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
+      override def get(): Processor[K, V] = processorSupplier()
+    }
+    inner.process(processorSupplierJ, stateStoreNames: _*)
+  }
+
+  /**
+   * Group the records by their current key into a [[KGroupedStream]] 
+   * <p>
+   * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
+   * serdes that will be converted to a `Serialized` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import DefaultSerdes._
+   *
+   * val clicksPerRegion: KTable[String, Long] =
+   *   userClicksStream
+   *     .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
+   *     .map((_, regionWithClicks) => regionWithClicks)
+   *
+   *     // the groupByKey gets the Serialized instance through an implicit conversion of the
+   *     // serdes brought into scope through the import DefaultSerdes._ above
+   *     .groupByKey
+   *     .reduce(_ + _)
+   *
+   * // Similarly you can create an implicit Serialized and it will be passed implicitly
+   * // to the groupByKey call
+   * }}}
+   *
+   * @param (implicit) serialized the instance of Serialized that gives the serdes 
+   * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
+   */ 
+  def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
+    inner.groupByKey(serialized)
+
+  /**
+   * Group the records of this [[KStream]] on a new key that is selected using the provided key transformation function
+   * and the `Serialized` instance.
+   * <p>
+   * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
+   * serdes that will be converted to a `Serialized` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import DefaultSerdes._
+   *
+   * val textLines = streamBuilder.stream[String, String](inputTopic)
+   *
+   * val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+   *
+   * val wordCounts: KTable[String, Long] =
+   *   textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+   *
+   *     // the groupBy gets the Serialized instance through an implicit conversion of the
+   *     // serdes brought into scope through the import DefaultSerdes._ above
+   *     .groupBy((k, v) => v)
+   *
+   *     .count()
+   * }}}
+   *
+   * @param selector a function that computes a new key for grouping
+   * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#groupBy`
+   */ 
+  def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] =
+    inner.groupBy(selector.asKeyValueMapper, serialized)
+
+  /**
+   * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with 
+   * serializers and deserializers supplied by the implicit `Joined` instance.
+   *
+   * @param otherStream the [[KStream]] to be joined with this stream
+   * @param joiner      a function that computes the join result for a pair of matching records
+   * @param windows     the specification of the `JoinWindows`
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
+   *                    key serde, value serde and other value serde in implicit scope and they will be
+   *                    converted to the instance of `Joined` through implicit conversion
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   * one for each matched record-pair with the same key and within the joining window intervals
+   * @see `org.apache.kafka.streams.kstream.KStream#join`
+   */ 
+  def join[VO, VR](otherStream: KStream[K, VO],
+    joiner: (V, VO) => VR,
+    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+      inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+
+  /**
+   * Join records of this stream with another [[KTable]]'s records using inner equi join with 
+   * serializers and deserializers supplied by the implicit `Joined` instance.
+   *
+   * @param table    the [[KTable]] to be joined with this stream
+   * @param joiner   a function that computes the join result for a pair of matching records
+   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   *                 inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
+   *                 key serde, value serde and other value serde in implicit scope and they will be
+   *                 converted to the instance of `Joined` through implicit conversion
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   * one for each matched record-pair with the same key 
+   * @see `org.apache.kafka.streams.kstream.KStream#join`
+   */ 
+  def join[VT, VR](table: KTable[K, VT],
+    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
+      inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
+
+  /**
+   * Join records of this stream with `GlobalKTable`'s records using non-windowed inner equi join.
+   *
+   * @param globalKTable   the `GlobalKTable` to be joined with this stream
+   * @param keyValueMapper a function used to map from the (key, value) of this stream
+   *                       to the key of the `GlobalKTable`
+   * @param joiner         a function that computes the join result for a pair of matching records
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   *                       one output for each input [[KStream]] record
+   * @see `org.apache.kafka.streams.kstream.KStream#join`
+   */ 
+  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+    keyValueMapper: (K, V) => GK,
+    joiner: (V, GV) => RV): KStream[K, RV] =
+      inner.join[GK, GV, RV](
+        globalKTable,
+        ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
+        ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
+      )
+
+  /**
+   * Join records of this stream with another [[KStream]]'s records using windowed left equi join with 
+   * serializers and deserializers supplied by the implicit `Joined` instance.
+   *
+   * @param otherStream the [[KStream]] to be joined with this stream
+   * @param joiner      a function that computes the join result for a pair of matching records
+   * @param windows     the specification of the `JoinWindows`
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
+   *                    key serde, value serde and other value serde in implicit scope and they will be
+   *                    converted to the instance of `Joined` through implicit conversion
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   *                    one for each matched record-pair with the same key and within the joining window intervals
+   * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
+   */ 
+  def leftJoin[VO, VR](otherStream: KStream[K, VO],
+    joiner: (V, VO) => VR,
+    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+      inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+
+  /**
+   * Join records of this stream with another [[KTable]]'s records using left equi join with 
+   * serializers and deserializers supplied by the implicit `Joined` instance.
+   *
+   * @param table    the [[KTable]] to be joined with this stream
+   * @param joiner   a function that computes the join result for a pair of matching records
+   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   *                 inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
+   *                 key serde, value serde and other value serde in implicit scope and they will be
+   *                 converted to the instance of `Joined` through implicit conversion
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   *                 one for each matched record-pair with the same key 
+   * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
+   */ 
+  def leftJoin[VT, VR](table: KTable[K, VT],
+    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
+      inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
+
+  /**
+   * Join records of this stream with `GlobalKTable`'s records using non-windowed left equi join.
+   *
+   * @param globalKTable   the `GlobalKTable` to be joined with this stream
+   * @param keyValueMapper a function used to map from the (key, value) of this stream
+   *                       to the key of the `GlobalKTable`
+   * @param joiner         a function that computes the join result for a pair of matching records
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   *                       one output for each input [[KStream]] record
+   * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
+   */ 
+  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+    keyValueMapper: (K, V) => GK,
+    joiner: (V, GV) => RV): KStream[K, RV] = {
+
+    inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner)
+  }
+
+  /**
+   * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with 
+   * serializers and deserializers supplied by the implicit `Joined` instance.
+   *
+   * @param otherStream the [[KStream]] to be joined with this stream
+   * @param joiner      a function that computes the join result for a pair of matching records
+   * @param windows     the specification of the `JoinWindows`
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
+   *                    key serde, value serde and other value serde in implicit scope and they will be
+   *                    converted to the instance of `Joined` through implicit conversion
+   * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
+   * one for each matched record-pair with the same key and within the joining window intervals
+   * @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
+   */ 
+  def outerJoin[VO, VR](otherStream: KStream[K, VO],
+    joiner: (V, VO) => VR,
+    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+      inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+
+  /**
+   * Merge this stream and the given stream into one larger stream.
+   * <p>
+   * There is no ordering guarantee between records from this `KStream` and records from the provided `KStream` 
+   * in the merged stream. Relative order is preserved within each input stream though (ie, records within 
+   * one input stream are processed in order).
+   *
+   * @param stream a stream which is to be merged into this stream
+   * @return a merged stream containing all records from this and the provided [[KStream]]
+   * @see `org.apache.kafka.streams.kstream.KStream#merge`
+   */
+  def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner)
+
+  /**
+   * Perform an action on each record of {@code KStream}.
+   * <p>
+   * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
+   * and returns an unchanged stream.
+   *
+   * @param action an action to perform on each record
+   * @see `org.apache.kafka.streams.kstream.KStream#peek`
+   */
+  def peek(action: (K, V) => Unit): KStream[K, V] = {
+    inner.peek((k: K, v: V) => action(k, v))
+  }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
new file mode 100644
index 00000000000..0369ee5aa58
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -0,0 +1,292 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+/**
+ * Wraps the Java class [[org.apache.kafka.streams.kstream.KTable]] and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for KTable
+ *
+ * @see `org.apache.kafka.streams.kstream.KTable`
+ */
+class KTable[K, V](val inner: KTableJ[K, V]) {
+
+  /**
+   * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @return a [[KTable]] that contains only those records that satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KTable#filter`
+   */ 
+  def filter(predicate: (K, V) => Boolean): KTable[K, V] = {
+    inner.filter(predicate(_, _))
+  }
+
+  /**
+   * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains only those records that satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KTable#filter`
+   */ 
+  def filter(predicate: (K, V) => Boolean,
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+    inner.filter(predicate.asPredicate, materialized)
+  }
+
+  /**
+   * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
+   */ 
+  def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = {
+    inner.filterNot(predicate(_, _))
+  }
+
+  /**
+   * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given
+   * predicate
+   *
+   * @param predicate a filter that is applied to each record
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
+   * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
+   */ 
+  def filterNot(predicate: (K, V) => Boolean,
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+    inner.filterNot(predicate.asPredicate, materialized)
+  }
+
+  /**
+   * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
+   * (with possible new type) in the new [[KTable]].
+   * <p>
+   * The provided `mapper`, a function `V => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `V => VR` that computes a new output value
+   * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
+   */ 
+  def mapValues[VR](mapper: V => VR): KTable[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapper)
+  }
+
+  /**
+   * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
+   * (with possible new type) in the new [[KTable]].
+   * <p>
+   * The provided `mapper`, a function `V => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `V => VR` that computes a new output value
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
+   */ 
+  def mapValues[VR](mapper: V => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapper, materialized)
+  }
+
+  /**
+   * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
+   * (with possible new type) in the new [[KTable]].
+   * <p>
+   * The provided `mapper`, a function `(K, V) => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `(K, V) => VR` that computes a new output value
+   * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
+   */ 
+  def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapperWithKey)
+  }
+
+  /**
+   * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
+   * (with possible new type) in the new [[KTable]].
+   * <p>
+   * The provided `mapper`, a function `(K, V) => VR` is applied to each input record value and computes a new value for it
+   *
+   * @param mapper, a function `(K, V) => VR` that computes a new output value
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
+   */ 
+  def mapValues[VR](mapper: (K, V) => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+    inner.mapValues[VR](mapper.asValueMapperWithKey)
+  }
+
+  /**
+   * Convert this changelog stream to a [[KStream]].
+   *
+   * @return a [[KStream]] that contains the same records as this [[KTable]]
+   * @see `org.apache.kafka.streams.kstream.KTable#toStream`
+   */
+  def toStream: KStream[K, V] = inner.toStream
+
+  /**
+   * Convert this changelog stream to a [[KStream]] using the given key/value mapper to select the new key
+   *
+   * @param mapper a function that computes a new key for each record
+   * @return a [[KStream]] that contains the same records as this [[KTable]]
+   * @see `org.apache.kafka.streams.kstream.KTable#toStream`
+   */
+  def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+    inner.toStream[KR](mapper.asKeyValueMapper)
+  }
+
+  /**
+   * Re-groups the records of this [[KTable]] using the provided key/value mapper
+   * and `Serde`s as specified by `Serialized`.
+   *
+   * @param selector      a function that computes a new grouping key and value to be aggregated
+   * @param serialized    the `Serialized` instance used to specify `Serdes`
+   * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
+   * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
+   */ 
+  def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR, VR]): KGroupedTable[KR, VR] = {
+    inner.groupBy(selector.asKeyValueMapper, serialized)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#join`
+   */ 
+  def join[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR): KTable[K, VR] = {
+
+    inner.join[VO, VR](other.inner, joiner.asValueJoiner)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#join`
+   */ 
+  def join[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+
+    inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
+   */ 
+  def leftJoin[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR): KTable[K, VR] = {
+
+    inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
+   */ 
+  def leftJoin[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+
+    inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
+   */ 
+  def outerJoin[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR): KTable[K, VR] = {
+
+    inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
+  }
+
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
+   *
+   * @param other  the other [[KTable]] to be joined with this [[KTable]]
+   * @param joiner a function that computes the join result for a pair of matching records
+   * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
+   *                      should be materialized. 
+   * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
+   * one for each matched record-pair with the same key
+   * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
+   */ 
+  def outerJoin[VO, VR](other: KTable[K, VO],
+    joiner: (V, VO) => VR,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = {
+
+    inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+  }
+
+  /**
+   * Get the name of the local state store used that can be used to query this [[KTable]].
+   *
+   * @return the underlying state store name, or `null` if this [[KTable]] cannot be queried.
+   */
+  def queryableStoreName: String =
+    inner.queryableStoreName
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
new file mode 100644
index 00000000000..7e9fa0713fe
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
+import org.apache.kafka.streams.state.SessionStore
+import org.apache.kafka.common.utils.Bytes
+
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+/**
+ * Wraps the Java class SessionWindowedKStream and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for SessionWindowedKStream
+ *
+ * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream`
+ */
+class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
+   *
+   * @param initializer    the initializer function
+   * @param aggregator     the aggregator function
+   * @param sessionMerger  the merger function
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
+   * the latest (rolling) aggregate for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR,
+    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] = {
+
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger)
+  }
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
+   *
+   * @param initializer    the initializer function
+   * @param aggregator     the aggregator function
+   * @param sessionMerger  the merger function
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
+   * the latest (rolling) aggregate for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR,
+    merger: (K, VR, VR) => VR,
+    materialized: Materialized[K, VR, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
+
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
+  }
+
+  /**
+   * Count the number of records in this stream by the grouped key into `SessionWindows`.
+   *
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values
+   * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
+   */
+  def count(): KTable[Windowed[K], Long] = {
+    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
+    c.mapValues[Long](Long2long(_))
+  }
+
+  /**
+   * Count the number of records in this stream by the grouped key into `SessionWindows`.
+   *
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values
+   * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
+   */
+  def count(materialized: Materialized[K, Long, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] =
+    inner.count(materialized)
+
+  /**
+   * Combine values of this stream by the grouped key into {@link SessionWindows}.
+   *
+   * @param reducer           a reducer function that computes a new aggregate result. 
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
+   * the latest (rolling) aggregate for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
+   */
+  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+    inner.reduce((v1, v2) => reducer(v1, v2))
+  }
+
+  /**
+   * Combine values of this stream by the grouped key into {@link SessionWindows}.
+   *
+   * @param reducer           a reducer function that computes a new aggregate result. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
+   * the latest (rolling) aggregate for each key within a window
+   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
+   */
+  def reduce(reducer: (V, V) => V,
+    materialized: Materialized[K, V, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
+    inner.reduce(reducer.asReducer, materialized)
+  }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
new file mode 100644
index 00000000000..1aa19786d5a
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+package kstream
+
+import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
+import org.apache.kafka.streams.state.WindowStore
+import org.apache.kafka.common.utils.Bytes
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.FunctionConversions._
+
+/**
+ * Wraps the Java class TimeWindowedKStream and delegates method calls to the underlying Java object.
+ *
+ * @param [K] Type of keys
+ * @param [V] Type of values
+ * @param inner The underlying Java abstraction for TimeWindowedKStream
+ *
+ * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream`
+ */
+class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key.
+   *
+   * @param initializer   an initializer function that computes an initial intermediate aggregation result
+   * @param aggregator    an aggregator function that computes a new aggregate result
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] = {
+
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  }
+
+  /**
+   * Aggregate the values of records in this stream by the grouped key.
+   *
+   * @param initializer   an initializer function that computes an initial intermediate aggregation result
+   * @param aggregator    an aggregator function that computes a new aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
+   */
+  def aggregate[VR](initializer: () => VR,
+    aggregator: (K, V, VR) => VR,
+    materialized: Materialized[K, VR, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], VR] = {
+
+    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+  }
+
+  /**
+   * Count the number of records in this stream by the grouped key and the defined windows.
+   *
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
+   */ 
+  def count(): KTable[Windowed[K], Long] = {
+    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
+    c.mapValues[Long](Long2long(_))
+  }
+
+  /**
+   * Count the number of records in this stream by the grouped key and the defined windows.
+   *
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
+   * represent the latest (rolling) count (i.e., number of records) for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
+   */ 
+  def count(materialized: Materialized[K, Long, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] = {
+    val c: KTable[Windowed[K], java.lang.Long] = 
+      inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, WindowStore[Bytes, Array[Byte]]]])
+    c.mapValues[Long](Long2long(_))
+  }
+
+  /**
+   * Combine the values of records in this stream by the grouped key.
+   *
+   * @param reducer   a function that computes a new aggregate result
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
+   */
+  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] = {
+    inner.reduce(reducer.asReducer)
+  }
+
+  /**
+   * Combine the values of records in this stream by the grouped key.
+   *
+   * @param reducer   a function that computes a new aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
+   * latest (rolling) aggregate for each key
+   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
+   */
+  def reduce(reducer: (V, V) => V,
+    materialized: Materialized[K, V, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], V] = {
+
+    inner.reduce(reducer.asReducer, materialized)
+  }
+}
diff --git a/streams/streams-scala/src/test/resources/log4j.properties b/streams/streams-scala/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..93ffc165654
--- /dev/null
+++ b/streams/streams-scala/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+#   Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+#   Copyright (C) 2017-2018 Alexis Seigneurin.
+#  
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements. See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License. You may obtain a copy of the License at
+#  
+#      http://www.apache.org/licenses/LICENSE-2.0
+#  
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, R
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=logs/kafka-streams-scala.log
+
+log4j.appender.R.MaxFileSize=100KB
+# Keep one backup file
+log4j.appender.R.MaxBackupIndex=1
+
+# A1 uses PatternLayout.
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
new file mode 100644
index 00000000000..24974c40f72
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -0,0 +1,237 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Assert._
+import org.junit.rules.TemporaryFolder
+import org.junit._
+
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.test.TestUtils
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.scala.kstream._
+
+import ImplicitConversions._
+import com.typesafe.scalalogging.LazyLogging
+
+/**
+ * Test suite that does an example to demonstrate stream-table joins in Kafka Streams
+ * <p>
+ * The suite contains the test case using Scala APIs `testShouldCountClicksPerRegion` and the same test case using the
+ * Java APIs `testShouldCountClicksPerRegionJava`. The idea is to demonstrate that both generate the same result.
+ * <p>
+ * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
+ * Hence the native Java API based version is more verbose.
+ */ 
+class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
+  with StreamToTableJoinTestData with LazyLogging {
+
+  private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
+
+  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
+
+  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
+  val mockTime: MockTime = cluster.time
+  mockTime.setCurrentTimeMs(alignedTime)
+
+  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
+  @Rule def testFolder: TemporaryFolder = tFolder
+
+  @Before
+  def startKafkaCluster(): Unit = {
+    cluster.createTopic(userClicksTopic)
+    cluster.createTopic(userRegionsTopic)
+    cluster.createTopic(outputTopic)
+    cluster.createTopic(userClicksTopicJ)
+    cluster.createTopic(userRegionsTopicJ)
+    cluster.createTopic(outputTopicJ)
+  }
+
+  @Test def testShouldCountClicksPerRegion(): Unit = {
+
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, 
+    // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
+    // get these instances automatically
+    import DefaultSerdes._
+
+    val streamsConfiguration: Properties = getStreamsConfiguration()
+
+    val builder = new StreamsBuilder()
+
+    val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+
+    val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+
+    // Compute the total per region by summing the individual click counts per region.
+    val clicksPerRegion: KTable[String, Long] =
+      userClicksStream
+
+        // Join the stream against the table.
+        .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
+
+        // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+        .map((_, regionWithClicks) => regionWithClicks)
+
+        // Compute the total per region by summing the individual click counts per region.
+        .groupByKey
+        .reduce(_ + _)
+
+    // Write the (continuously updating) results to the output topic.
+    clicksPerRegion.toStream.to(outputTopic)
+
+    val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
+    streams.start()
+
+
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = 
+      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
+
+    streams.close()
+
+    import collection.JavaConverters._
+    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
+  }
+
+  @Test def testShouldCountClicksPerRegionJava(): Unit = {
+
+    import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
+    import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+    import collection.JavaConverters._
+    import java.lang.{Long => JLong}
+
+    val streamsConfiguration: Properties = getStreamsConfiguration()
+
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+
+    val builder: StreamsBuilderJ = new StreamsBuilderJ()
+
+    val userClicksStream: KStreamJ[String, JLong] = 
+      builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String(), Serdes.Long()))
+
+    val userRegionsTable: KTableJ[String, String] = 
+      builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String(), Serdes.String()))
+
+    // Join the stream against the table.
+    val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
+      .leftJoin(userRegionsTable, 
+        new ValueJoiner[JLong, String, (String, JLong)] {
+          def apply(clicks: JLong, region: String): (String, JLong) = 
+            (if (region == null) "UNKNOWN" else region, clicks)
+        }, 
+        Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String())) 
+
+    // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+    val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
+      .map { 
+        new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
+          def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+        }
+      }
+        
+    // Compute the total per region by summing the individual click counts per region.
+    val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
+      .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+      .reduce {
+        new Reducer[JLong] {
+          def apply(v1: JLong, v2: JLong) = v1 + v2
+        }
+      }
+        
+    // Write the (continuously updating) results to the output topic.
+    clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+
+    val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration)
+
+    streams.start()
+
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = 
+      produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
+
+    streams.close()
+    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
+  }
+
+  private def getStreamsConfiguration(): Properties = {
+    val streamsConfiguration: Properties = new Properties()
+
+    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test")
+    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
+    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+
+    streamsConfiguration
+  }
+
+  private def getUserRegionsProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p
+  }
+
+  private def getUserClicksProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer])
+    p
+  }
+
+  private def getConsumerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer")
+    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
+    p
+  }
+
+  private def produceNConsume(userClicksTopic: String, userRegionsTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
+
+    import collection.JavaConverters._
+    
+    // Publish user-region information.
+    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions.asJava, userRegionsProducerConfig, mockTime, false)
+
+    // Publish user-click information.
+    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, userClicks.asJava, userClicksProducerConfig, mockTime, false)
+
+    // consume and verify result
+    val consumerConfig = getConsumerConfig()
+
+    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size)
+  }
+}
+
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
new file mode 100644
index 00000000000..45715a7abe6
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.KeyValue
+
+trait StreamToTableJoinTestData {
+  val brokers = "localhost:9092"
+
+  val userClicksTopic = s"user-clicks"
+  val userRegionsTopic = s"user-regions"
+  val outputTopic = s"output-topic"
+
+  val userClicksTopicJ = s"user-clicks-j"
+  val userRegionsTopicJ = s"user-regions-j"
+  val outputTopicJ = s"output-topic-j"
+
+  // Input 1: Clicks per user (multiple records allowed per user).
+  val userClicks: Seq[KeyValue[String, Long]] = Seq(
+    new KeyValue("alice", 13L),
+    new KeyValue("bob", 4L),
+    new KeyValue("chao", 25L),
+    new KeyValue("bob", 19L),
+    new KeyValue("dave", 56L),
+    new KeyValue("eve", 78L),
+    new KeyValue("alice", 40L),
+    new KeyValue("fang", 99L)
+  )
+
+  // Input 2: Region per user (multiple records allowed per user).
+  val userRegions: Seq[KeyValue[String, String]] = Seq(
+    new KeyValue("alice", "asia"), /* Alice lived in Asia originally... */
+    new KeyValue("bob", "americas"),
+    new KeyValue("chao", "asia"),
+    new KeyValue("dave", "europe"),
+    new KeyValue("alice", "europe"), /* ...but moved to Europe some time later. */
+    new KeyValue("eve", "americas"),
+    new KeyValue("fang", "asia")
+  )
+
+  val expectedClicksPerRegion: Seq[KeyValue[String, Long]] = Seq(
+    new KeyValue("americas", 101L),
+    new KeyValue("europe", 109L),
+    new KeyValue("asia", 124L)
+  )
+}
+
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
new file mode 100644
index 00000000000..89b2935b956
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -0,0 +1,199 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+import java.util.regex.Pattern
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Assert._
+import org.junit._
+
+import org.apache.kafka.streams.scala.kstream._
+
+import org.apache.kafka.common.serialization._
+
+import ImplicitConversions._
+import com.typesafe.scalalogging.LazyLogging
+
+import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ, _}
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+import collection.JavaConverters._
+
+/**
+ * Test suite that verifies that the topology built by the Java and Scala APIs match.
+ */ 
+class TopologyTest extends JUnitSuite with LazyLogging {
+
+  val inputTopic = "input-topic"
+  val userClicksTopic = "user-clicks-topic"
+  val userRegionsTopic = "user-regions-topic"
+
+  val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaSimple() = {
+
+    // build the Scala topology
+    def getTopologyScala(): TopologyDescription = {
+
+      import DefaultSerdes._
+      import collection.JavaConverters._
+  
+      val streamBuilder = new StreamsBuilder
+      val textLines = streamBuilder.stream[String, String](inputTopic)
+  
+      val _: KStream[String, String] =
+        textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+  
+      streamBuilder.build().describe()
+    }
+  
+    // build the Java topology
+    def getTopologyJava(): TopologyDescription = {
+
+      val streamBuilder = new StreamsBuilderJ
+      val textLines = streamBuilder.stream[String, String](inputTopic)
+  
+      val _: KStreamJ[String, String] = textLines.flatMapValues {
+        new ValueMapper[String, java.lang.Iterable[String]] {
+          def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
+        }
+      }
+      streamBuilder.build().describe()
+    }
+
+    // should match
+    assertEquals(getTopologyScala(), getTopologyJava())
+  }
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaAggregate() = {
+
+    // build the Scala topology
+    def getTopologyScala(): TopologyDescription = {
+
+      import DefaultSerdes._
+      import collection.JavaConverters._
+  
+      val streamBuilder = new StreamsBuilder
+      val textLines = streamBuilder.stream[String, String](inputTopic)
+  
+      val _: KTable[String, Long] =
+        textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+          .groupBy((k, v) => v)
+          .count()
+  
+      streamBuilder.build().describe()
+    }
+
+    // build the Java topology
+    def getTopologyJava(): TopologyDescription = {
+
+      val streamBuilder = new StreamsBuilderJ
+      val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
+  
+      val splits: KStreamJ[String, String] = textLines.flatMapValues {
+        new ValueMapper[String, java.lang.Iterable[String]] {
+          def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
+        }
+      }
+  
+      val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
+        new KeyValueMapper[String, String, String] {
+          def apply(k: String, v: String): String = v
+        }
+      }
+  
+      val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+  
+      streamBuilder.build().describe()
+    }
+
+    // should match
+    assertEquals(getTopologyScala(), getTopologyJava())
+  }
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaJoin() = {
+
+    // build the Scala topology
+    def getTopologyScala(): TopologyDescription = {
+      import DefaultSerdes._
+  
+      val builder = new StreamsBuilder()
+  
+      val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+  
+      val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+  
+      val clicksPerRegion: KTable[String, Long] =
+        userClicksStream
+          .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
+          .map((_, regionWithClicks) => regionWithClicks)
+          .groupByKey
+          .reduce(_ + _)
+
+      builder.build().describe()
+    }
+
+    // build the Java topology
+    def getTopologyJava(): TopologyDescription = {
+
+      import java.lang.{Long => JLong}
+  
+      val builder: StreamsBuilderJ = new StreamsBuilderJ()
+  
+      val userClicksStream: KStreamJ[String, JLong] = 
+        builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String(), Serdes.Long()))
+  
+      val userRegionsTable: KTableJ[String, String] = 
+        builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String(), Serdes.String()))
+  
+      // Join the stream against the table.
+      val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
+        .leftJoin(userRegionsTable, 
+          new ValueJoiner[JLong, String, (String, JLong)] {
+            def apply(clicks: JLong, region: String): (String, JLong) = 
+              (if (region == null) "UNKNOWN" else region, clicks)
+          }, 
+          Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String())) 
+  
+      // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+      val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
+        .map { 
+          new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
+            def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+          }
+        }
+          
+      // Compute the total per region by summing the individual click counts per region.
+      val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
+        .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+        .reduce {
+          new Reducer[JLong] {
+            def apply(v1: JLong, v2: JLong) = v1 + v2
+          }
+        }
+
+      builder.build().describe()
+    }
+
+    // should match
+    assertEquals(getTopologyScala(), getTopologyJava())
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
new file mode 100644
index 00000000000..f71e0cb4a13
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -0,0 +1,223 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+import java.util.regex.Pattern
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Assert._
+import org.junit._
+import org.junit.rules.TemporaryFolder
+
+import org.apache.kafka.streams.KeyValue
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.scala.kstream._
+
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.test.TestUtils
+
+import ImplicitConversions._
+import com.typesafe.scalalogging.LazyLogging
+
+/**
+ * Test suite that does a classic word count example.
+ * <p>
+ * The suite contains the test case using Scala APIs `testShouldCountWords` and the same test case using the
+ * Java APIs `testShouldCountWordsJava`. The idea is to demonstrate that both generate the same result.
+ * <p>
+ * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
+ * Hence the native Java API based version is more verbose.
+ */ 
+class WordCountTest extends JUnitSuite with WordCountTestData with LazyLogging {
+
+  private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
+
+  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
+
+  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
+  val mockTime: MockTime = cluster.time
+  mockTime.setCurrentTimeMs(alignedTime)
+
+
+  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
+  @Rule def testFolder: TemporaryFolder = tFolder
+    
+
+  @Before
+  def startKafkaCluster(): Unit = {
+    cluster.createTopic(inputTopic)
+    cluster.createTopic(outputTopic)
+    cluster.createTopic(inputTopicJ)
+    cluster.createTopic(outputTopicJ)
+  }
+
+  @Test def testShouldCountWords(): Unit = {
+
+    import DefaultSerdes._
+
+    val streamsConfiguration = getStreamsConfiguration()
+
+    val streamBuilder = new StreamsBuilder
+    val textLines = streamBuilder.stream[String, String](inputTopic)
+
+    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+
+    // generate word counts
+    val wordCounts: KTable[String, Long] =
+      textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+        .groupBy((k, v) => v)
+        .count()
+
+    // write to output topic
+    wordCounts.toStream.to(outputTopic)
+
+    val streams: KafkaStreams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
+    streams.start()
+
+    // produce and consume synchronously
+    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)
+
+    streams.close()
+
+    import collection.JavaConverters._
+    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
+  }
+
+  @Test def testShouldCountWordsJava(): Unit = {
+
+    import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
+    import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+    import collection.JavaConverters._
+
+    val streamsConfiguration = getStreamsConfiguration()
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+
+    val streamBuilder = new StreamsBuilderJ
+    val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
+
+    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
+
+    val splits: KStreamJ[String, String] = textLines.flatMapValues {
+      new ValueMapper[String, java.lang.Iterable[String]] {
+        def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
+      }
+    }
+
+    val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
+      new KeyValueMapper[String, String, String] {
+        def apply(k: String, v: String): String = v
+      }
+    }
+
+    val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+
+    wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+
+    val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
+    streams.start()
+
+    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopicJ, outputTopicJ)
+
+    streams.close()
+
+    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
+  }
+
+  private def getStreamsConfiguration(): Properties = {
+    val streamsConfiguration: Properties = new Properties()
+
+    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-test")
+    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
+    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+    streamsConfiguration
+  }
+
+  private def getProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p
+  }
+
+  private def getConsumerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer")
+    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
+    p
+  }
+
+  private def produceNConsume(inputTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
+
+    val linesProducerConfig: Properties = getProducerConfig()
+
+    import collection.JavaConverters._
+    IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues.asJava, linesProducerConfig, mockTime)
+
+    val consumerConfig = getConsumerConfig()
+
+    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size)
+  }
+}
+
+trait WordCountTestData {
+  val inputTopic = s"inputTopic"
+  val outputTopic = s"outputTopic"
+  val inputTopicJ = s"inputTopicJ"
+  val outputTopicJ = s"outputTopicJ"
+
+  val inputValues = List(
+    "Hello Kafka Streams",
+    "All streams lead to Kafka",
+    "Join Kafka Summit",
+    "И теперь пошли русские слова"
+  )
+
+  val expectedWordCounts: List[KeyValue[String, Long]] = List(
+    new KeyValue("hello", 1L),
+    new KeyValue("all", 1L),
+    new KeyValue("streams", 2L),
+    new KeyValue("lead", 1L),
+    new KeyValue("to", 1L),
+    new KeyValue("join", 1L),
+    new KeyValue("kafka", 3L),
+    new KeyValue("summit", 1L),
+    new KeyValue("и", 1L),
+    new KeyValue("теперь", 1L),
+    new KeyValue("пошли", 1L),
+    new KeyValue("русские", 1L),
+    new KeyValue("слова", 1L)
+  )
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement a Scala wrapper library for Kafka Streams
> ---------------------------------------------------
>
>                 Key: KAFKA-6670
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6670
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Debasish Ghosh
>            Assignee: Debasish Ghosh
>            Priority: Major
>              Labels: api, kip
>             Fix For: 1.2.0
>
>
> Implement a Scala wrapper library for Kafka Streams.
> KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)