You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/07/24 16:01:58 UTC

samza git commit: SAMZA-1773: Side inputs for local stores

Repository: samza
Updated Branches:
  refs/heads/master 9f8d593c4 -> fa56b15dc


SAMZA-1773: Side inputs for local stores

prateekm vjagadish
Please take a look.

I will update the PR with the unit tests for SideInputStorageManager and the util functions.

Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Author: Prateek Maheshwari <pm...@apache.org>
Author: Prateek Maheshwari <pm...@pmaheshw-mn2.linkedin.biz>

Reviewers: Cameron Lee <ca...@linkedin.com>, Jagadish Venkatraman <vj...@gmail.com>, Shanthoosh Venkatraman <sv...@linkedin.com>

Closes #570 from bharathkk/side-input-v3


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa56b15d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa56b15d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa56b15d

Branch: refs/heads/master
Commit: fa56b15dc55f006cda57eec7d8dfc999db8b94fb
Parents: 9f8d593
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Tue Jul 24 09:01:09 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Jul 24 09:01:09 2018 -0700

----------------------------------------------------------------------
 .../samza/storage/SideInputsProcessor.java      |  46 +++
 .../storage/SideInputsProcessorFactory.java     |  45 +++
 .../java/org/apache/samza/table/TableSpec.java  |  44 ++-
 .../apache/samza/config/JavaStorageConfig.java  |  47 +++
 .../apache/samza/container/TaskContextImpl.java |  15 +-
 .../org/apache/samza/execution/JobNode.java     |  20 +-
 .../samza/storage/StorageManagerUtil.java       | 138 +++++++
 .../storage/TaskSideInputStorageManager.java    | 379 +++++++++++++++++++
 .../org/apache/samza/config/StorageConfig.scala |  12 +
 .../org/apache/samza/config/StreamConfig.scala  |   2 +-
 .../apache/samza/container/SamzaContainer.scala |  66 +++-
 .../apache/samza/container/TaskInstance.scala   | 130 +++++--
 .../samza/storage/TaskStorageManager.scala      |  96 +----
 .../scala/org/apache/samza/util/FileUtil.scala  |   8 +-
 .../org/apache/samza/util/ScalaJavaUtil.scala   |  12 +
 .../main/scala/org/apache/samza/util/Util.scala |  20 +
 .../org/apache/samza/task/TestAsyncRunLoop.java |   2 +-
 .../samza/container/TestTaskInstance.scala      |   5 +-
 .../samza/storage/TestTaskStorageManager.scala  |  25 +-
 .../kv/inmemory/InMemoryTableDescriptor.java    |   3 +-
 .../storage/kv/RocksDbTableDescriptor.java      |   3 +-
 .../kv/BaseLocalStoreBackedTableDescriptor.java |  21 +
 .../kv/BaseLocalStoreBackedTableProvider.java   |  12 +
 23 files changed, 983 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
new file mode 100644
index 0000000..d2dcac5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+
+/**
+ * The processing logic for store side inputs. Accepts incoming messages from side input streams
+ * and the current store contents, and returns the new key-value entries to be written to the store.
+ */
+@FunctionalInterface
+@InterfaceStability.Unstable
+public interface SideInputsProcessor extends Serializable {
+
+  /**
+   * Process the incoming side input message for the {@code store}.
+   *
+   * @param message incoming message envelope
+   * @param store the store associated with the incoming message envelope
+   * @return a {@link Collection} of {@link Entry}s that will be written to the {@code store}.
+   */
+  Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
new file mode 100644
index 0000000..fb7fc2c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * A factory to build {@link SideInputsProcessor}s.
+ *
+ * Implementations should return a new instance for every invocation of
+ * {@link #getSideInputsProcessor(Config, MetricsRegistry)}
+ */
+@FunctionalInterface
+@InterfaceStability.Unstable
+public interface SideInputsProcessorFactory extends Serializable {
+  /**
+   * Creates a new instance of a {@link SideInputsProcessor}.
+   *
+   * @param config the configuration
+   * @param metricsRegistry the metrics registry
+   * @return an instance of {@link SideInputsProcessor}
+   */
+  SideInputsProcessor getSideInputsProcessor(Config config, MetricsRegistry metricsRegistry);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
index ba57c2f..82577c4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -21,10 +21,12 @@ package org.apache.samza.table;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.storage.SideInputsProcessor;
 
 
 /**
@@ -52,6 +54,8 @@ public class TableSpec implements Serializable {
    * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis
    */
   private transient final KVSerde serde;
+  private transient final List<String> sideInputs;
+  private transient final SideInputsProcessor sideInputsProcessor;
   private transient final Map<String, String> config = new HashMap<>();
 
   /**
@@ -61,6 +65,8 @@ public class TableSpec implements Serializable {
     this.id = null;
     this.serde = null;
     this.tableProviderFactoryClassName = null;
+    this.sideInputs = null;
+    this.sideInputsProcessor = null;
   }
 
   /**
@@ -71,12 +77,28 @@ public class TableSpec implements Serializable {
    * @param serde the serde
    * @param config implementation specific configuration
    */
-  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName,
-      Map<String, String> config) {
+  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config) {
+    this(tableId, serde, tableProviderFactoryClassName, config, Collections.emptyList(), null);
+  }
+
+  /**
+   * Constructs a {@link TableSpec}
+   *
+   * @param tableId Id of the table
+   * @param tableProviderFactoryClassName table provider factory
+   * @param serde the serde
+   * @param config implementation specific configuration
+   * @param sideInputs list of side inputs for the table
+   * @param sideInputsProcessor side input processor for the table
+   */
+  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config,
+      List<String> sideInputs, SideInputsProcessor sideInputsProcessor) {
     this.id = tableId;
     this.serde = serde;
     this.tableProviderFactoryClassName = tableProviderFactoryClassName;
     this.config.putAll(config);
+    this.sideInputs = sideInputs;
+    this.sideInputsProcessor = sideInputsProcessor;
   }
 
   /**
@@ -113,6 +135,24 @@ public class TableSpec implements Serializable {
     return Collections.unmodifiableMap(config);
   }
 
+  /**
+   * Get the list of side inputs for the table.
+   *
+   * @return a {@link List} of side input streams
+   */
+  public List<String> getSideInputs() {
+    return sideInputs;
+  }
+
+  /**
+   * Get the {@link SideInputsProcessor} associated with the table.
+   *
+   * @return a {@link SideInputsProcessor}
+   */
+  public SideInputsProcessor getSideInputsProcessor() {
+    return sideInputsProcessor;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index bbf0ccf..1e3dbff 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -20,7 +20,11 @@
 package org.apache.samza.config;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.execution.StreamManager;
@@ -43,6 +47,10 @@ public class JavaStorageConfig extends MapConfig {
   private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled";
   private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;
 
+  public static final String SIDE_INPUTS = "stores.%s.side.inputs";
+  public static final String SIDE_INPUTS_PROCESSOR_FACTORY = "stores.%s.side.inputs.processor.factory";
+  public static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = "stores.%s.side.inputs.processor.serialized.instance";
+
   public JavaStorageConfig(Config config) {
     super(config);
   }
@@ -126,4 +134,43 @@ public class JavaStorageConfig extends MapConfig {
   public String getChangelogSystem() {
     return get(CHANGELOG_SYSTEM,  get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
   }
+
+  /**
+   * Gets the side inputs for the store. A store can have multiple side input streams which can be
+   * provided as a comma separated list.
+   *
+   * Each side input must either be a {@code streamId}, or of the format {@code systemName.streamName}.
+   * E.g. {@code stores.storeName.side.inputs = kafka.topicA, mySystem.topicB}
+   *
+   * @param storeName name of the store
+   * @return a list of side input streams for the store, or an empty list if it has none.
+   */
+  public List<String> getSideInputs(String storeName) {
+    return Optional.ofNullable(get(String.format(SIDE_INPUTS, storeName), null))
+        .map(inputs -> Stream.of(inputs.split(","))
+            .map(String::trim)
+            .filter(input -> !input.isEmpty())
+            .collect(Collectors.toList()))
+        .orElse(Collections.emptyList());
+  }
+
+  /**
+   * Gets the SideInputsProcessorFactory associated with the {@code storeName}.
+   *
+   * @param storeName name of the store
+   * @return the class name of SideInputsProcessorFactory if present, null otherwise
+   */
+  public String getSideInputsProcessorFactory(String storeName) {
+    return get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName), null);
+  }
+
+  /**
+   * Gets the serialized instance of SideInputsProcessor associated with the {@code storeName}.
+   *
+   * @param storeName name of the store
+   * @return the serialized instance of SideInputsProcessor if present, null otherwise
+   */
+  public String getSideInputsProcessorSerializedInstance(String storeName) {
+    return get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName), null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 0d76a33..d65be4c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -20,10 +20,10 @@
 package org.apache.samza.container;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.function.Function;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.TaskStorageManager;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
@@ -48,7 +48,7 @@ public class TaskContextImpl implements TaskContext {
   private final SamzaContainerContext containerContext;
   private final Set<SystemStreamPartition> systemStreamPartitions;
   private final OffsetManager offsetManager;
-  private final TaskStorageManager storageManager;
+  private final Function<String, KeyValueStore> kvStoreSupplier;
   private final TableManager tableManager;
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
@@ -62,7 +62,7 @@ public class TaskContextImpl implements TaskContext {
                          SamzaContainerContext containerContext,
                          Set<SystemStreamPartition> systemStreamPartitions,
                          OffsetManager offsetManager,
-                         TaskStorageManager storageManager,
+                         Function<String, KeyValueStore> kvStoreSupplier,
                          TableManager tableManager,
                          JobModel jobModel,
                          StreamMetadataCache streamMetadataCache,
@@ -72,7 +72,7 @@ public class TaskContextImpl implements TaskContext {
     this.containerContext = containerContext;
     this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
     this.offsetManager = offsetManager;
-    this.storageManager = storageManager;
+    this.kvStoreSupplier = kvStoreSupplier;
     this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
@@ -91,12 +91,11 @@ public class TaskContextImpl implements TaskContext {
 
   @Override
   public KeyValueStore getStore(String storeName) {
-    if (storageManager != null) {
-      return (KeyValueStore) storageManager.apply(storeName);
-    } else {
+    KeyValueStore store = kvStoreSupplier.apply(storeName);
+    if (store == null) {
       LOG.warn("No store found for name: {}", storeName);
-      return null;
     }
+    return store;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 6507996..288b1a1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -140,7 +140,7 @@ public class JobNode {
         inputs.add(formattedSystemStream);
       }
     }
-    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
     if (!broadcasts.isEmpty()) {
       // TODO: remove this once we support defining broadcast input stream in high-level
       // task.broadcast.input should be generated by the planner in the future.
@@ -179,6 +179,24 @@ public class JobNode {
 
     configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables));
 
+    // Add side inputs to the inputs and mark the stream as bootstrap
+    tables.forEach(tableSpec -> {
+        List<String> sideInputs = tableSpec.getSideInputs();
+        if (sideInputs != null && !sideInputs.isEmpty()) {
+          sideInputs.stream()
+              .map(sideInput -> Util.getSystemStreamFromNameOrId(config, sideInput))
+              .forEach(systemStream -> {
+                  inputs.add(Util.getNameFromSystemStream(systemStream));
+                  configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+                      systemStream.getSystem(), systemStream.getStream()), "true");
+                });
+        }
+      });
+
+    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
+    log.info("Job {} has generated configs {}", jobName, configs);
+
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
 
     // Disallow user specified job inputs/outputs. This info comes strictly from the user application.

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
new file mode 100644
index 0000000..731a84d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StorageManagerUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
+
+  /**
+   * Fetch the starting offset for the input {@link SystemStreamPartition}
+   *
+   * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and
+   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the locally
+   * checkpointed offset if it is valid, or fall back to oldest offset of the stream.
+   *
+   * @param ssp system stream partition for which starting offset is requested
+   * @param admin system admin associated with the ssp
+   * @param fileOffset local file offset for the ssp
+   * @param oldestOffset oldest offset for the ssp from the source
+   * @return starting offset for the incoming {@link SystemStreamPartition}
+   */
+  public static String getStartingOffset(
+      SystemStreamPartition ssp, SystemAdmin admin, String fileOffset, String oldestOffset) {
+    String startingOffset = oldestOffset;
+    if (fileOffset != null) {
+      // File offset was the last message written to the local checkpoint that is also reflected in the store,
+      // so we start with the NEXT offset
+      String resumeOffset = admin.getOffsetsAfter(ImmutableMap.of(ssp, fileOffset)).get(ssp);
+      if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
+        startingOffset = resumeOffset;
+      } else {
+        // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
+        // This can happen with source of the store(changelog, etc) configured with a TTL cleanup policy
+        LOG.warn("Local store offset {} is lower than the oldest offset {} of the source stream."
+            + " The values between these offsets cannot be restored.", resumeOffset, oldestOffset);
+      }
+    }
+
+    return startingOffset;
+  }
+
+  /**
+   * Checks if the store is stale. If the time elapsed since the last modified time of the offset file is greater than
+   * the {@code storeDeleteRetentionInMs}, then the store is considered stale.
+   *
+   * @param storeDir the base directory of the store
+   * @param offsetFileName the offset file name
+   * @param storeDeleteRetentionInMs store delete retention in millis
+   * @param currentTimeMs current time in ms
+   * @return true if the store is stale, false otherwise
+   */
+  public static boolean isStaleStore(
+      File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) {
+    boolean isStaleStore = false;
+    String storePath = storeDir.toPath().toString();
+    if (storeDir.exists()) {
+      File offsetFileRef = new File(storeDir, offsetFileName);
+      long offsetFileLastModifiedTime = offsetFileRef.lastModified();
+      if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) {
+        LOG.info(
+            String.format("Store: %s is stale since lastModifiedTime of offset file: %d, is older than store deleteRetentionMs: %d.",
+            storePath, offsetFileLastModifiedTime, storeDeleteRetentionInMs));
+        isStaleStore = true;
+      }
+    } else {
+      LOG.info("Storage partition directory: {} does not exist.", storePath);
+    }
+    return isStaleStore;
+  }
+
+  /**
+   * An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
+   *
+   * @param storeDir the base directory of the store
+   * @param offsetFileName name of the offset file
+   * @return true if the offset file is valid. false otherwise.
+   */
+  public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
+    boolean hasValidOffsetFile = false;
+    if (storeDir.exists()) {
+      String offsetContents = readOffsetFile(storeDir, offsetFileName);
+      if (offsetContents != null && !offsetContents.isEmpty()) {
+        hasValidOffsetFile = true;
+      } else {
+        LOG.info("Offset file is not valid for store: {}.", storeDir.toPath());
+      }
+    }
+
+    return hasValidOffsetFile;
+  }
+
+  /**
+   * Read and return the contents of the offset file.
+   *
+   * @param storagePartitionDir the base directory of the store
+   * @param offsetFileName name of the offset file
+   * @return the content of the offset file if it exists for the store, null otherwise.
+   */
+  public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
+    String offset = null;
+    File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+    String storePath = storagePartitionDir.getPath();
+
+    if (offsetFileRef.exists()) {
+      LOG.info("Found offset file in storage partition directory: {}", storePath);
+      offset = FileUtil.readWithChecksum(offsetFileRef);
+    } else {
+      LOG.info("No offset file found in storage partition directory: {}", storePath);
+    }
+
+    return offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
new file mode 100644
index 0000000..7a0a822
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * A storage manager for all side input stores. It is associated with each {@link org.apache.samza.container.TaskInstance}
+ * and is responsible for handling directory management, offset tracking and offset file management for the side input stores.
+ */
+public class TaskSideInputStorageManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
+  private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
+  private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private final Clock clock;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final Map<String, StorageEngine> stores;
+  private final String storeBaseDir;
+  private final Map<String, Set<SystemStreamPartition>> storeToSSps;
+  private final Map<SystemStreamPartition, Set<String>> sspsToStores;
+  private final StreamMetadataCache streamMetadataCache;
+  private final SystemAdmins systemAdmins;
+  private final TaskName taskName;
+  private final JavaStorageConfig storageConfig;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputStorageManager(
+      TaskName taskName,
+      StreamMetadataCache streamMetadataCache,
+      String storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, SideInputsProcessor> storesToProcessor,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      SystemAdmins systemAdmins,
+      Config config,
+      Clock clock) {
+    this.clock = clock;
+    this.storageConfig = new JavaStorageConfig(config);
+    this.stores = sideInputStores;
+    this.storeBaseDir = storeBaseDir;
+    this.storeToSSps = storesToSSPs;
+    this.streamMetadataCache = streamMetadataCache;
+    this.systemAdmins = systemAdmins;
+    this.taskName = taskName;
+    this.storeToProcessor = storesToProcessor;
+
+    validateStoreConfiguration();
+
+    this.sspsToStores = new HashMap<>();
+    storesToSSPs.forEach((store, ssps) -> {
+        for (SystemStreamPartition ssp: ssps) {
+          sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());
+          sspsToStores.computeIfPresent(ssp, (key, value) -> {
+              value.add(store);
+              return value;
+            });
+        }
+      });
+  }
+
+  /**
+   * Initializes the side input storage manager.
+   */
+  public void init() {
+    LOG.info("Initializing side input stores.");
+
+    Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
+    LOG.info("File offsets for the task {}: ", taskName, fileOffsets);
+
+    Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
+    LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets);
+
+    startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+
+    lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    initializeStoreDirectories();
+  }
+
+  /**
+   * Flushes the contents of the underlying store and writes the offset file to disk.
+   */
+  public void flush() {
+    LOG.info("Flushing the side input stores.");
+    stores.values().forEach(StorageEngine::flush);
+    writeOffsetFiles();
+  }
+
+  /**
+   * Stops the storage engines for all the stores and writes the offset file to disk.
+   */
+  public void stop() {
+    LOG.info("Stopping the side input stores.");
+    stores.values().forEach(StorageEngine::stop);
+    writeOffsetFiles();
+  }
+
+  /**
+   * Gets the {@link StorageEngine} associated with the input {@code storeName} if found, or null.
+   *
+   * @param storeName store name to get the {@link StorageEngine} for
+   * @return the {@link StorageEngine} associated with {@code storeName} if found, or null
+   */
+  public StorageEngine getStore(String storeName) {
+    return stores.get(storeName);
+  }
+
+  /**
+   * Gets the starting offset for the given side input {@link SystemStreamPartition}.
+   *
+   * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and
+   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the local offset
+   * file if it is valid, else it will fall back to oldest offset in the stream.
+   *
+   * @param ssp side input system stream partition to get the starting offset for
+   * @return the starting offset
+   */
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return startingOffsets.get(ssp);
+  }
+
+  /**
+   * Gets the last processed offset for the given side input {@link SystemStreamPartition}.
+   *
+   * @param ssp side input system stream partition to get the last processed offset for
+   * @return the last processed offset
+   */
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return lastProcessedOffsets.get(ssp);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   *
+   * @param message incoming message to be processed
+   */
+  public void process(IncomingMessageEnvelope message) {
+    SystemStreamPartition ssp = message.getSystemStreamPartition();
+    Set<String> storeNames = sspsToStores.get(ssp);
+
+    for (String storeName : storeNames) {
+      SideInputsProcessor sideInputsProcessor = storeToProcessor.get(storeName);
+
+      KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
+      Collection<Entry<?, ?>> entriesToBeWritten = sideInputsProcessor.process(message, keyValueStore);
+      keyValueStore.putAll(ImmutableList.copyOf(entriesToBeWritten));
+    }
+
+    // update the last processed offset
+    lastProcessedOffsets.put(ssp, message.getOffset());
+  }
+
+  /**
+   * Initializes the store directories for all the stores:
+   *  1. Cleans up the directories for invalid stores.
+   *  2. Ensures that the directories exist.
+   */
+  private void initializeStoreDirectories() {
+    LOG.info("Initializing side input store directories.");
+
+    stores.keySet().forEach(storeName -> {
+        File storeLocation = getStoreLocation(storeName);
+        String storePath = storeLocation.toPath().toString();
+        if (!isValidSideInputStore(storeName, storeLocation)) {
+          LOG.info("Cleaning up the store directory at {} for {}", storePath, storeName);
+          FileUtil.rm(storeLocation);
+        }
+
+        if (!storeLocation.exists()) {
+          LOG.info("Creating {} as the store directory for the side input store {}", storePath, storeName);
+          storeLocation.mkdirs();
+        }
+      });
+  }
+
+  /**
+   * Writes the offset files for all side input stores one by one. There is one offset file per store.
+   * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
+   */
+  private void writeOffsetFiles() {
+    storeToSSps.entrySet().stream()
+        .filter(entry -> isPersistedStore(entry.getKey())) // filter out in-memory side input stores
+        .forEach((entry) -> {
+            String storeName = entry.getKey();
+            Map<SystemStreamPartition, String> offsets = entry.getValue().stream()
+              .filter(lastProcessedOffsets::containsKey)
+              .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
+
+            try {
+              String fileContents = OBJECT_MAPPER.writeValueAsString(offsets);
+              File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
+              FileUtil.writeWithChecksum(offsetFile, fileContents);
+            } catch (Exception e) {
+              throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
+            }
+          });
+  }
+
+  /**
+   * Gets the side input SSP offsets for all stores from their local offset files.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset in the offset files.
+   */
+  @SuppressWarnings("unchecked")
+  private Map<SystemStreamPartition, String> getFileOffsets() {
+    LOG.info("Loading initial offsets from the file for side input stores.");
+    Map<SystemStreamPartition, String> fileOffsets = new HashMap<>();
+
+    stores.keySet().forEach(storeName -> {
+        LOG.debug("Reading local offsets for store: {}", storeName);
+
+        File storeLocation = getStoreLocation(storeName);
+        if (isValidSideInputStore(storeName, storeLocation)) {
+          try {
+            String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
+            Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, Map.class);
+            fileOffsets.putAll(offsets);
+          } catch (Exception e) {
+            LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
+          }
+        }
+      });
+
+    return fileOffsets;
+  }
+
+  private File getStoreLocation(String storeName) {
+    return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+  }
+
+  /**
+   * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores.
+   * If the local file offset is available and is greater than the oldest available offset from source, uses it,
+   * else falls back to oldest offset in the source.
+   *
+   * @param fileOffsets offsets from the local offset file
+   * @param oldestOffsets oldest offsets from the source
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset
+   */
+  private Map<SystemStreamPartition, String> getStartingOffsets(
+      Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
+    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+    sspsToStores.keySet().forEach(ssp -> {
+        String fileOffset = fileOffsets.get(ssp);
+        String oldestOffset = oldestOffsets.get(ssp);
+
+        startingOffsets.put(ssp,
+          StorageManagerUtil.getStartingOffset(
+            ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
+      });
+
+    return startingOffsets;
+  }
+
+  /**
+   * Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs.
+   *   1. Groups the list of the SSPs based on system stream
+   *   2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache}
+   *   3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
+   *      and populates the oldest offset for SSPs belonging to the system stream.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
+   */
+  private Map<SystemStreamPartition, String> getOldestOffsets() {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+    // Step 1
+    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = sspsToStores.keySet().stream()
+        .collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+    // Step 2
+    Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava();
+
+    // Step 3
+    metadata.forEach((systemStream, systemStreamMetadata) -> {
+        // get the partition metadata for each system stream
+        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
+          systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+        // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
+        Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream()
+          .collect(
+              Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
+
+        oldestOffsets.putAll(offsets);
+      });
+
+    return oldestOffsets;
+  }
+
+  private boolean isValidSideInputStore(String storeName, File storeLocation) {
+    return isPersistedStore(storeName)
+        && !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
+        && StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE);
+  }
+
+  private boolean isPersistedStore(String storeName) {
+    return Optional.ofNullable(stores.get(storeName))
+        .map(StorageEngine::getStoreProperties)
+        .map(StoreProperties::isPersistedToDisk)
+        .orElse(false);
+  }
+
+  private void validateStoreConfiguration() {
+    stores.forEach((storeName, storageEngine) -> {
+        if (StringUtils.isBlank(storageConfig.getSideInputsProcessorFactory(storeName))) {
+          throw new SamzaException(
+              String.format("Side inputs processor factory configuration missing for store: %s.", storeName));
+        }
+
+        if (storageEngine.getStoreProperties().isLoggedStore()) {
+          throw new SamzaException(
+              String.format("Cannot configure both side inputs and a changelog for store: %s.", storeName));
+        }
+      });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index e4ee767..c9df3b5 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -76,6 +76,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
     conf.asScala.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
   }
 
+  def getSideInputs(storeName: String): Seq[String] = {
+    new JavaStorageConfig(config).getSideInputs(storeName).asScala
+  }
+
+  def getSideInputsProcessorFactory(storeName: String): Option[String] = {
+    Option(new JavaStorageConfig(config).getSideInputsProcessorFactory(storeName))
+  }
+
+  def getSideInputsProcessorSerializedInstance(storeName: String): Option[String] = {
+    Option(new JavaStorageConfig(config).getSideInputsProcessorSerializedInstance(storeName))
+  }
+
   /**
     * Build a map of storeName to changeLogDeleteRetention for all of the stores.
     * @return a map from storeName to the changeLogDeleteRetention of the store in ms.

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 0a4623e..298c8ca 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -42,8 +42,8 @@ object StreamConfig {
 
   // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
   private val STREAMS_PREFIX = "streams."
-  private val STREAM_PREFIX = "systems.%s.streams.%s."
 
+  val STREAM_PREFIX = "systems.%s.streams.%s."
   val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
   val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
   val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89278ad..35802ac 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -30,7 +30,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
@@ -48,10 +47,11 @@ import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
+import org.apache.samza.storage._
 import org.apache.samza.system._
 import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
 import org.apache.samza.table.TableManager
+import org.apache.samza.table.utils.SerdeUtils
 import org.apache.samza.task._
 import org.apache.samza.util.Util
 import org.apache.samza.util._
@@ -354,6 +354,14 @@ object SamzaContainer extends Logging {
 
     info("Got intermediate streams: %s" format intermediateStreams)
 
+    val sideInputStoresToSystemStreams = config.getStoreNames
+      .map { storeName => (storeName, config.getSideInputs(storeName)) }
+      .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+      .map { case (storeName, sideInputs) => (storeName, sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) }
+      .toMap
+
+    info("Got side input store system streams: %s" format sideInputStoresToSystemStreams)
+
     val controlMessageKeySerdes = intermediateStreams
       .flatMap(streamId => {
         val systemStream = config.streamIdToSystemStream(streamId)
@@ -531,7 +539,7 @@ object SamzaContainer extends Logging {
       val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir)
       info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir)
 
-      var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
+      val loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
       info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
       val taskStores = storageEngineFactories
@@ -577,9 +585,32 @@ object SamzaContainer extends Logging {
 
       info("Got task stores: %s" format taskStores)
 
+      val taskSSPs = taskModel.getSystemStreamPartitions.asScala.toSet
+      info("Got task SSPs: %s" format taskSSPs)
+
+      val (sideInputStores, nonSideInputStores) =
+        taskStores.partition { case (storeName, _) => sideInputStoresToSystemStreams.contains(storeName)}
+
+      val sideInputStoresToSSPs = sideInputStoresToSystemStreams.mapValues(sideInputSystemStreams =>
+        taskSSPs.filter(ssp => sideInputSystemStreams.contains(ssp.getSystemStream)).asJava)
+
+      val taskSideInputSSPs = sideInputStoresToSSPs.values.flatMap(_.asScala).toSet
+
+      info ("Got task side input SSPs: %s" format taskSideInputSSPs)
+
+      val sideInputStoresToProcessor = sideInputStores.keys.map(storeName => {
+          // serialized instances takes precedence over the factory configuration.
+          config.getSideInputsProcessorSerializedInstance(storeName).map(serializedInstance =>
+              (storeName, SerdeUtils.deserialize("Side Inputs Processor", serializedInstance)))
+            .orElse(config.getSideInputsProcessorFactory(storeName).map(factoryClassName =>
+              (storeName, Util.getObj(factoryClassName, classOf[SideInputsProcessorFactory])
+                .getSideInputsProcessor(config, taskInstanceMetrics.registry))))
+            .get
+        }).toMap
+
       val storageManager = new TaskStorageManager(
         taskName = taskName,
-        taskStores = taskStores,
+        taskStores = nonSideInputStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
         maxChangeLogStreamPartitions,
@@ -592,17 +623,24 @@ object SamzaContainer extends Logging {
         new StorageConfig(config).getChangeLogDeleteRetentionsInMs,
         new SystemClock)
 
+      var sideInputStorageManager: TaskSideInputStorageManager = null
+      if (sideInputStores.nonEmpty) {
+        sideInputStorageManager = new TaskSideInputStorageManager(
+          taskName,
+          streamMetadataCache,
+          loggedStorageBaseDir.getPath,
+          sideInputStores.asJava,
+          sideInputStoresToProcessor.asJava,
+          sideInputStoresToSSPs.asJava,
+          systemAdmins,
+          config,
+          new SystemClock)
+      }
+
       val tableManager = new TableManager(config, serdes.asJava)
 
       info("Got table manager")
 
-      val systemStreamPartitions = taskModel
-        .getSystemStreamPartitions
-        .asScala
-        .toSet
-
-      info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName)
-
       def createTaskInstance(task: Any): TaskInstance = new TaskInstance(
           task = task,
           taskName = taskName,
@@ -616,11 +654,13 @@ object SamzaContainer extends Logging {
           storageManager = storageManager,
           tableManager = tableManager,
           reporters = reporters,
-          systemStreamPartitions = systemStreamPartitions,
+          systemStreamPartitions = taskSSPs,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config),
           jobModel = jobModel,
           streamMetadataCache = streamMetadataCache,
-          timerExecutor = timerExecutor)
+          timerExecutor = timerExecutor,
+          sideInputSSPs = taskSideInputSSPs,
+          sideInputStorageManager = sideInputStorageManager)
 
       val taskInstance = createTaskInstance(task)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 64ee7f3..0caca4f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -28,11 +28,12 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
 import org.apache.samza.system._
 import org.apache.samza.table.TableManager
 import org.apache.samza.task._
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, ScalaJavaUtil}
 
 import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
@@ -55,15 +56,29 @@ class TaskInstance(
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
   streamMetadataCache: StreamMetadataCache = null,
-  timerExecutor : ScheduledExecutorService = null) extends Logging {
+  timerExecutor : ScheduledExecutorService = null,
+  sideInputSSPs: Set[SystemStreamPartition] = Set(),
+  sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging {
+
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
+  val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
+    (storeName: String) => {
+      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
+        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      } else if (sideInputStorageManager != null && sideInputStorageManager.getStore(storeName) != null) {
+        sideInputStorageManager.getStore(storeName).asInstanceOf[KeyValueStore[_, _]]
+      } else {
+        null
+      }
+    })
+
   val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
-                                    storageManager, tableManager, jobModel, streamMetadataCache, timerExecutor)
+                                    kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
@@ -85,7 +100,8 @@ class TaskInstance(
   def registerOffsets {
     debug("Registering offsets for taskName: %s" format taskName)
 
-    offsetManager.register(taskName, systemStreamPartitions)
+    val sspsToRegister = systemStreamPartitions -- sideInputSSPs
+    offsetManager.register(taskName, sspsToRegister)
   }
 
   def startStores {
@@ -96,6 +112,13 @@ class TaskInstance(
     } else {
       debug("Skipping storage manager initialization for taskName: %s" format taskName)
     }
+
+    if (sideInputStorageManager != null) {
+      debug("Starting side input storage manager for taskName: %s" format taskName)
+      sideInputStorageManager.init()
+    } else {
+      debug("Skipping side input storage manager initialization for taskName: %s" format taskName)
+    }
   }
 
   def startTableManager {
@@ -128,14 +151,14 @@ class TaskInstance(
     debug("Registering consumers for taskName: %s" format taskName)
 
     systemStreamPartitions.foreach(systemStreamPartition => {
-      val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition)
-      .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
-      consumerMultiplexer.register(systemStreamPartition, offset)
-      metrics.addOffsetGauge(systemStreamPartition, () => {
-        offsetManager
-          .getLastProcessedOffset(taskName, systemStreamPartition)
-          .orNull
-      })
+      val startingOffset = getStartingOffset(systemStreamPartition)
+      consumerMultiplexer.register(systemStreamPartition, startingOffset)
+      metrics.addOffsetGauge(systemStreamPartition, () =>
+        if (sideInputSSPs.contains(systemStreamPartition)) {
+          sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)
+        } else {
+          offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull
+        })
     })
   }
 
@@ -143,31 +166,37 @@ class TaskInstance(
     callbackFactory: TaskCallbackFactory = null) {
     metrics.processes.inc
 
-    if (!ssp2CaughtupMapping.getOrElse(envelope.getSystemStreamPartition,
-      throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) {
+    val incomingMessageSsp = envelope.getSystemStreamPartition
+
+    if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
+      throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
       checkCaughtUp(envelope)
     }
 
-    if (ssp2CaughtupMapping(envelope.getSystemStreamPartition)) {
+    if (ssp2CaughtupMapping(incomingMessageSsp)) {
       metrics.messagesActuallyProcessed.inc
 
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
-        format (taskName, envelope.getSystemStreamPartition))
+        format (taskName, incomingMessageSsp))
 
-      if (isAsyncTask) {
-        exceptionHandler.maybeHandle {
-          val callback = callbackFactory.createCallback()
-          task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
-        }
+      if (sideInputSSPs.contains(incomingMessageSsp)) {
+        sideInputStorageManager.process(envelope)
       } else {
-        exceptionHandler.maybeHandle {
-         task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
-        }
+        if (isAsyncTask) {
+          exceptionHandler.maybeHandle {
+            val callback = callbackFactory.createCallback()
+            task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
+          }
+        } else {
+          exceptionHandler.maybeHandle {
+            task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
+          }
 
-        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
-          format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
+          trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+            format (taskName, incomingMessageSsp, envelope.getOffset))
 
-        offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
+          offsetManager.update(taskName, incomingMessageSsp, envelope.getOffset)
+        }
       }
     }
   }
@@ -217,8 +246,12 @@ class TaskInstance(
       storageManager.flush
     }
 
-    trace("Checkpointing offsets for taskName: %s" format taskName)
+    trace("Flushing side input stores for taskName: %s" format taskName)
+    if (sideInputStorageManager != null) {
+      sideInputStorageManager.flush()
+    }
 
+    trace("Checkpointing offsets for taskName: %s" format taskName)
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
     if (checkpoint != null) {
@@ -249,6 +282,13 @@ class TaskInstance(
     } else {
       debug("Skipping storage manager shutdown for taskName: %s" format taskName)
     }
+
+    if (sideInputStorageManager != null) {
+      debug("Shutting down side input storage manager for taskName: %s" format taskName)
+      sideInputStorageManager.stop()
+    } else {
+      debug("Skipping side input storage manager shutdown for taskName: %s" format taskName)
+    }
   }
 
   def shutdownTableManager {
@@ -272,27 +312,29 @@ class TaskInstance(
    * it's already catched-up.
    */
   private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
+    val incomingMessageSsp = envelope.getSystemStreamPartition
+
     if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
-      ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+      ssp2CaughtupMapping(incomingMessageSsp) = true
     } else {
       systemAdmins match {
         case null => {
           warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
-          ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+          ssp2CaughtupMapping(incomingMessageSsp) = true
         }
         case others => {
-          val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
-              .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
-          val system = envelope.getSystemStreamPartition.getSystem
+          val startingOffset = getStartingOffset(incomingMessageSsp)
+
+          val system = incomingMessageSsp.getSystem
           others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match {
             case null => {
               info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
-              ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
+              ssp2CaughtupMapping(incomingMessageSsp) = true // not comparable
             }
             case result => {
               if (result >= 0) {
-                info(envelope.getSystemStreamPartition.toString + " is catched up.")
-                ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+                info(incomingMessageSsp.toString + " is catched up.")
+                ssp2CaughtupMapping(incomingMessageSsp) = true
               }
             }
           }
@@ -300,4 +342,18 @@ class TaskInstance(
       }
     }
   }
+
+  private def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
+    val offset =
+      if (sideInputSSPs.contains(systemStreamPartition)) {
+        Option(sideInputStorageManager.getStartingOffset(systemStreamPartition))
+      } else {
+        offsetManager.getStartingOffset(taskName, systemStreamPartition)
+      }
+
+    val startingOffset = offset.getOrElse(
+      throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
+
+    startingOffset
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 62b59fb..90fdc19 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -28,8 +28,6 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.system._
 import org.apache.samza.util.{Clock, FileUtil, Logging}
 
-import scala.collection.JavaConverters._
-
 object TaskStorageManager {
   def getStoreDir(storeBaseDir: File, storeName: String) = {
     new File(storeBaseDir, storeName)
@@ -70,7 +68,7 @@ class TaskStorageManager(
   val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
   val offsetFileName = "OFFSET"
 
-  def apply(storageEngineName: String) = taskStores(storageEngineName)
+  def getStore(storeName: String): Option[StorageEngine] = taskStores.get(storeName)
 
   def init {
     cleanBaseDirs()
@@ -101,7 +99,7 @@ class TaskStorageManager(
         info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString)
         FileUtil.rm(loggedStorePartitionDir)
       } else {
-        val offset = readOffsetFile(loggedStorePartitionDir)
+        val offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, offsetFileName)
         info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
         if (offset != null) {
           fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
@@ -111,7 +109,7 @@ class TaskStorageManager(
   }
 
   /**
-    * Directory {@code loggedStoreDir} associated with the logged store {@code storeName} is valid,
+    * Directory loggedStoreDir associated with the logged store storeName is valid
     * if all of the following conditions are true.
     * a) If the store has to be persisted to disk.
     * b) If there is a valid offset file associated with the logged store.
@@ -120,55 +118,12 @@ class TaskStorageManager(
     * @return true if the logged store is valid, false otherwise.
     */
   private def isLoggedStoreValid(storeName: String, loggedStoreDir: File): Boolean = {
-    val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs.getOrElse(storeName,
-                                                                               StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
-    persistedStores.contains(storeName) && isOffsetFileValid(loggedStoreDir) &&
-      !isStaleLoggedStore(loggedStoreDir, changeLogDeleteRetentionInMs)
-  }
+    val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs
+      .getOrElse(storeName, StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
 
-  /**
-    * Determines if the logged store directory {@code loggedStoreDir} is stale. A store is stale if the following condition is true.
-    *
-    *  ((CurrentTime) - (LastModifiedTime of the Offset file) is greater than the changelog's tombstone retention).
-    *
-    * @param loggedStoreDir the base directory of the local change-logged store.
-    * @param changeLogDeleteRetentionInMs the delete retention of the changelog in milli seconds.
-    * @return true if the store is stale, false otherwise.
-    *
-    */
-  private def isStaleLoggedStore(loggedStoreDir: File, changeLogDeleteRetentionInMs: Long): Boolean = {
-    var isStaleStore = false
-    val storePath = loggedStoreDir.toPath.toString
-    if (loggedStoreDir.exists()) {
-      val offsetFileRef = new File(loggedStoreDir, offsetFileName)
-      val offsetFileLastModifiedTime = offsetFileRef.lastModified()
-      if ((clock.currentTimeMillis() - offsetFileLastModifiedTime) >= changeLogDeleteRetentionInMs) {
-        info ("Store: %s is stale since lastModifiedTime of offset file: %s, " +
-          "is older than changelog deleteRetentionMs: %s." format(storePath, offsetFileLastModifiedTime, changeLogDeleteRetentionInMs))
-        isStaleStore = true
-      }
-    } else {
-      info("Logged storage partition directory: %s does not exist." format storePath)
-    }
-    isStaleStore
-  }
-
-  /**
-    * An offset file associated with logged store {@code loggedStoreDir} is valid if it exists and is not empty.
-    *
-    * @return true if the offset file is valid. false otherwise.
-    */
-  private def isOffsetFileValid(loggedStoreDir: File): Boolean = {
-    var hasValidOffsetFile = false
-    if (loggedStoreDir.exists()) {
-      val offsetContents = readOffsetFile(loggedStoreDir)
-      if (offsetContents != null && !offsetContents.isEmpty) {
-        hasValidOffsetFile = true
-      } else {
-        info("Offset file is not valid for store: %s." format loggedStoreDir.toPath.toString)
-      }
-    }
-    hasValidOffsetFile
+    persistedStores.contains(storeName) &&
+      StorageManagerUtil.isOffsetFileValid(loggedStoreDir, offsetFileName) &&
+      !StorageManagerUtil.isStaleStore(loggedStoreDir, offsetFileName, changeLogDeleteRetentionInMs, clock.currentTimeMillis())
   }
 
   private def setupBaseDirs() {
@@ -187,24 +142,6 @@ class TaskStorageManager(
     }
   }
 
-  /**
-    * Read and return the contents of the offset file.
-    *
-    * @param loggedStoragePartitionDir the base directory of the store
-    * @return the content of the offset file if it exists for the store, null otherwise.
-    */
-  private def readOffsetFile(loggedStoragePartitionDir: File): String = {
-    var offset : String = null
-    val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
-    if (offsetFileRef.exists()) {
-      info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
-      offset = FileUtil.readWithChecksum(offsetFileRef)
-    } else {
-      info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
-    }
-    offset
-  }
-
   private def validateChangelogStreams() = {
     info("Validating change log streams: " + changeLogSystemStreams)
 
@@ -262,22 +199,7 @@ class TaskStorageManager(
       .getOrElse(systemStreamPartition.getSystemStream,
         throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
 
-    if (fileOffset != null) {
-      // File offset was the last message written to the changelog that is also reflected in the store,
-      // so we start with the NEXT offset
-      val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition)
-      if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
-        resumeOffset
-      } else {
-        // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
-        // This can happen with changelogs configured with a TTL cleanup policy
-        warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " +
-          s"The values between these offsets cannot be restored.")
-        oldestOffset
-      }
-    } else {
-      oldestOffset
-    }
+    StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset)
   }
 
   private def restoreStores() {

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 4b93543..46a2089 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -33,7 +33,7 @@ object FileUtil {
     * @param file The file handle to write to
     * @param data The data to be written to the file
     * */
-  def writeWithChecksum(file: File, data: String) = {
+  def writeWithChecksum(file: File, data: String): Unit = {
     val checksum = getChecksum(data)
     var oos: ObjectOutputStream = null
     var fos: FileOutputStream = null
@@ -52,7 +52,7 @@ object FileUtil {
     * Reads from a file that has a checksum prepended to the data
     * @param file The file handle to read from
     * */
-  def readWithChecksum(file: File) = {
+  def readWithChecksum(file: File): String = {
     var fis: FileInputStream = null
     var ois: ObjectInputStream = null
     try {
@@ -76,7 +76,7 @@ object FileUtil {
     * Recursively remove a directory (or file), and all sub-directories. Equivalent
     * to rm -rf.
     */
-  def rm(file: File) {
+  def rm(file: File): Unit = {
     if (file == null) {
       return
     } else if (file.isDirectory) {
@@ -96,7 +96,7 @@ object FileUtil {
     * @param data The string for which checksum has to be generated
     * @return long type value representing the checksum
     * */
-  def getChecksum(data: String) = {
+  def getChecksum(data: String): Long = {
     val crc = new CRC32
     crc.update(data.getBytes)
     crc.getValue

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
index f3ba746..a359cd5 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
@@ -21,6 +21,8 @@
 
 package org.apache.samza.util
 
+import java.util.function
+
 import scala.collection.immutable.Map
 import scala.collection.JavaConverters._
 import scala.runtime.AbstractFunction0
@@ -47,6 +49,12 @@ object ScalaJavaUtil {
     }
   }
 
+  def toJavaFunction[T, R](scalaFunction: Function1[T, R]): java.util.function.Function[T, R] = {
+    new function.Function[T, R] {
+      override def apply(t: T): R = scalaFunction.apply(t)
+    }
+  }
+
   /**
     * Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]]
     *
@@ -59,4 +67,8 @@ object ScalaJavaUtil {
       override def apply(): T = javaFunction.get()
     }
   }
+
+  def toScalaFunction[T, R](javaFunction: java.util.function.Function[T, R]): Function1[T, R] = {
+    t => javaFunction.apply(t)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 059eb03..c9534bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -87,6 +87,26 @@ object Util extends Logging {
   }
 
   /**
+    * Gets the [[SystemStream]] corresponding to the provided stream, which may be
+    * a streamId, or stream name of the format systemName.streamName.
+    *
+    * @param stream the stream name or id to get the { @link SystemStream} for.
+    * @return the [[SystemStream]] for the stream
+    */
+  def getSystemStreamFromNameOrId(config: Config, stream: String): SystemStream = {
+    val parts = stream.split(".")
+    if (parts.length == 0 || parts.length > 2) {
+      throw new SamzaException(
+        String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream))
+    }
+    if (parts.length == 1) {
+      new StreamConfig(config).streamIdToSystemStream(stream)
+    } else {
+      new SystemStream(parts(0), parts(1))
+    }
+  }
+
+  /**
    * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
    *
    * @return the [[java.net.InetAddress]] which represents the localhost

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 88767f5..9cdbfe6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -82,7 +82,7 @@ public class TestAsyncRunLoop {
     return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
         manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics,
-        new scala.collection.immutable.HashSet<String>()), null, null, null);
+        new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null);
   }
 
   interface TestCode {

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 4ff7848..1672191 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -47,7 +47,6 @@ import org.mockito.Mockito
 import org.mockito.Mockito._
 import org.scalatest.Assertions.intercept
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.JavaConverters._
 
@@ -377,7 +376,7 @@ class TestTaskInstance {
     val mockOrder = inOrder(offsetManager, collector, storageManager)
 
     val taskInstance: TaskInstance = new TaskInstance(
-      Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+      Mockito.mock(classOf[StreamTask]),
       taskName,
       new MapConfig,
       new TaskInstanceMetrics,
@@ -418,7 +417,7 @@ class TestTaskInstance {
     val offsetManager = Mockito.mock(classOf[OffsetManager])
 
     val taskInstance: TaskInstance = new TaskInstance(
-      Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask],
+      Mockito.mock(classOf[StreamTask]),
       taskName,
       new MapConfig,
       new TaskInstanceMetrics,

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 3bb4e99..0b945cb 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -151,7 +151,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val ss = new SystemStream("kafka", "testStream")
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName)
+    val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName)
 
     val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
 
@@ -182,12 +182,12 @@ class TestTaskStorageManager extends MockitoSugar {
     taskManager.init
 
     // Verify that the store directory doesn't have ANY files
-    assertNull(storeDirectory.listFiles())
+    assertTrue(storeDirectory.list().isEmpty)
     verify(mockSystemConsumer).register(ssp, "0")
 
     // Test 2: flush should NOT create/update the offset file. Store directory has no files
     taskManager.flush()
-    assertNull(storeDirectory.listFiles())
+    assertTrue(storeDirectory.list().isEmpty)
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file is NOT created
     metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -197,7 +197,7 @@ class TestTaskStorageManager extends MockitoSugar {
     })
     when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata))
     taskManager.stop()
-    assertNull(storeDirectory.listFiles())
+    assertTrue(storeDirectory.list().isEmpty)
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the earliest offset
     metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -209,7 +209,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     taskManager.init
 
-    assertNull(storeDirectory.listFiles())
+    assertTrue(storeDirectory.list().isEmpty)
     // second time to register; make sure it starts from beginning
     verify(mockSystemConsumer, times(2)).register(ssp, "0")
   }
@@ -223,7 +223,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(store, false)
-      .addStore(loggedStore, true)
+      .addLoggedStore(loggedStore, true)
       .build
 
     //Invoke test method
@@ -244,7 +244,7 @@ class TestTaskStorageManager extends MockitoSugar {
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(loggedStore, true)
+      .addLoggedStore(loggedStore, true)
       .build
 
     val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
@@ -266,7 +266,7 @@ class TestTaskStorageManager extends MockitoSugar {
     FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
     val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
-      .addStore(loggedStore, true)
+      .addLoggedStore(loggedStore, true)
       .build
 
     val cleanDirMethod = taskStorageManager.getClass
@@ -285,7 +285,7 @@ class TestTaskStorageManager extends MockitoSugar {
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(loggedStore, false)
+      .addLoggedStore(loggedStore, false)
       .build
 
     val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
@@ -647,6 +647,13 @@ class TaskStorageManagerBuilder extends MockitoSugar {
     addStore(storeName, mockStorageEngine, mock[SystemConsumer])
   }
 
+  def addLoggedStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
+    val mockStorageEngine = mock[StorageEngine]
+    when(mockStorageEngine.getStoreProperties)
+    .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(true).build())
+    addStore(storeName, mockStorageEngine, mock[SystemConsumer])
+  }
+
   def setPartition(p: Partition) = {
     partition = p
     this

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
index 2681fb3..2a9532b 100644
--- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
@@ -50,7 +50,8 @@ public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDesc
     Map<String, String> tableSpecConfig = new HashMap<>();
     generateTableSpecConfig(tableSpecConfig);
 
-    return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig);
+    return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig,
+        sideInputs, sideInputsProcessor);
   }
 
   private void addInMemoryConfig(Map<String, String> map, String key, String value) {

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
index 2c62159..33bfc84 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
@@ -182,7 +182,8 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr
     Map<String, String> tableSpecConfig = new HashMap<>();
     generateTableSpecConfig(tableSpecConfig);
 
-    return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig);
+    return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig,
+        sideInputs, sideInputsProcessor);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
index 1f9b57b..2d05f95 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
@@ -18,9 +18,13 @@
  */
 package org.apache.samza.storage.kv;
 
+import com.google.common.base.Preconditions;
+
+import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.storage.SideInputsProcessor;
 
 
 /**
@@ -32,6 +36,8 @@ import org.apache.samza.operators.BaseTableDescriptor;
  */
 abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
     extends BaseTableDescriptor<K, V, D> {
+  protected List<String> sideInputs;
+  protected SideInputsProcessor sideInputsProcessor;
 
   /**
    * Constructs a table descriptor instance
@@ -41,6 +47,16 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
     super(tableId);
   }
 
+  public D withSideInputs(List<String> sideInputs) {
+    this.sideInputs = sideInputs;
+    return (D) this;
+  }
+
+  public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
+    this.sideInputsProcessor = sideInputsProcessor;
+    return (D) this;
+  }
+
   @Override
   protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
     super.generateTableSpecConfig(tableSpecConfig);
@@ -51,6 +67,11 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo
    */
   protected void validate() {
     super.validate();
+    if (sideInputs != null || sideInputsProcessor != null) {
+      Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null,
+          String.format("Invalid side input configuration for table: %s. " +
+              "Both side inputs and the processor must be provided", tableId));
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index b494eba..cacfe95 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -19,9 +19,11 @@
 package org.apache.samza.storage.kv;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
@@ -30,6 +32,7 @@ import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.utils.SerdeUtils;
 import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +105,15 @@ abstract public class BaseLocalStoreBackedTableProvider implements TableProvider
     String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
     storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
 
+    List<String> sideInputs = tableSpec.getSideInputs();
+    if (sideInputs != null && !sideInputs.isEmpty()) {
+      String formattedSideInputs = String.join(",", sideInputs);
+
+      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs);
+      storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()),
+          SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor()));
+    }
+
     return storeConfig;
   }