You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/11 23:13:48 UTC

samza git commit: SAMZA-1771: Test Framework support for stateful testing and assertions over state

Repository: samza
Updated Branches:
  refs/heads/master cc490ea89 -> 5c5afb82a


SAMZA-1771: Test Framework support for stateful testing and assertions over state

- Gives StateAssert for stateful assertions
- Gives examples of Testing stateful jobs for TaskApplication and StreamApplication

Author: Sanil15 <sa...@gmail.com>
Author: Sanil Jain <sn...@linkedin.com>
Author: Sanil Jain <sa...@gmail.com>

Reviewers: Shanthoosh Venkataraman <sv...@linkedin.com>, Prateek Maheshwari <pm...@apache.org>

Closes #683 from Sanil15/SAMZA-1771


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5c5afb82
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5c5afb82
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5c5afb82

Branch: refs/heads/master
Commit: 5c5afb82ab12c72a4b114b0858f77779285b86cf
Parents: cc490ea
Author: Sanil15 <sa...@gmail.com>
Authored: Thu Oct 11 16:13:45 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 11 16:13:45 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/test/framework/TestRunner.java | 45 ++++++++--
 .../system/InMemorySystemDescriptor.java        |  1 -
 .../AsyncStreamTaskIntegrationTest.java         |  3 +-
 .../StreamApplicationIntegrationTest.java       | 61 ++++++++++++-
 .../framework/StreamTaskIntegrationTest.java    | 93 ++++++++++++++++++++
 .../table/PageViewToProfileJoinFunction.java    |  2 +-
 .../table/TestLocalTableWithSideInputs.java     | 20 ++---
 7 files changed, 198 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index add3bf6..a3d8a0e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -20,6 +20,7 @@
 package org.apache.samza.test.framework;
 
 import com.google.common.base.Preconditions;
+import java.io.File;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,7 +30,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
@@ -61,7 +61,10 @@ import org.apache.samza.task.StreamTask;
 import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
-import org.junit.Assert;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * TestRunner provides APIs to set up integration tests for a Samza application.
@@ -80,6 +83,7 @@ import org.junit.Assert;
  *
  */
 public class TestRunner {
+  private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
   private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
   private static final String JOB_NAME = "samza-test";
 
@@ -98,8 +102,14 @@ public class TestRunner {
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+    // Changing the base directory for non-changelog stores used by Samza application to separate the
+    // on-disk store locations for concurrently executing tests
+    configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
+        new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath());
+    configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
+        new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath());
     addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
-    // This is important because Table Api enables host affinity by default for RocksDb
+    // Disabling host affinity since it requires reading locality information from a Kafka coordinator stream
     addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
     addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
     addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
@@ -251,16 +261,19 @@ public class TestRunner {
    * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode
    */
   public void run(Duration timeout) {
-    Preconditions.checkState(app != null,
-        "TestRunner should run for Low Level Task api or High Level Application Api");
+    Preconditions.checkNotNull(app);
     Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive");
+    // Cleaning store directories to ensure current run does not pick up state from previous run
+    deleteStoreDirectories();
     final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs));
     runner.run();
-    boolean timedOut = !runner.waitForFinish(timeout);
-    Assert.assertFalse("Timed out waiting for application to finish", timedOut);
+    if (!runner.waitForFinish(timeout)) {
+      throw new SamzaException("Timed out waiting for application to finish");
+    }
     ApplicationStatus status = runner.status();
+    deleteStoreDirectories();
     if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
-      throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
+      throw new SamzaException("Application could not finish successfully", status.getThrowable());
     }
   }
 
@@ -369,4 +382,20 @@ public class TestRunner {
             new EndOfStreamMessage(null)));
       });
   }
+
+  private void deleteStoreDirectories() {
+    Preconditions.checkNotNull(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
+    Preconditions.checkNotNull(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
+    deleteDirectory(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
+    deleteDirectory(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
+  }
+
+  private void deleteDirectory(String path) {
+    File dir = new File(path);
+    LOG.info("Deleting the directory " + path);
+    FileUtil.rm(dir);
+    if (dir.exists()) {
+      LOG.warn("Could not delete the directory " + path);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
index e6e423f..77948f6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
@@ -30,7 +30,6 @@ import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.config.JavaSystemConfig;
 
-
 /**
  * A descriptor for InMemorySystem.
  * System properties configured using a descriptor override corresponding properties provided in configuration.

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index f1757ab..7696b62 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
@@ -145,7 +146,7 @@ public class AsyncStreamTaskIntegrationTest {
   /**
    * Job should fail because it times out too soon
    */
-  @Test(expected = AssertionError.class)
+  @Test(expected = SamzaException.class)
   public void testSamzaJobTimeoutFailureForAsyncTask() {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 7b9bad7..1dda302 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.samza.test.framework;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import org.apache.samza.SamzaException;
@@ -27,27 +28,60 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.table.Table;
 import org.apache.samza.test.controlmessages.TestData;
 import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.table.PageViewToProfileJoinFunction;
+import org.apache.samza.test.table.TestTableData;
 import org.junit.Assert;
 import org.junit.Test;
-
-import static org.apache.samza.test.controlmessages.TestData.PageView;
+import static org.apache.samza.test.controlmessages.TestData.*;
 
 public class StreamApplicationIntegrationTest {
+
   private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
 
   @Test
+  public void testStatefulJoinWithLocalTable() {
+    List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
+    List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
+        .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+
+    InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
+        .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+        .withBootstrap(true);
+
+    InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
+        .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+
+    TestRunner
+        .of(new PageViewProfileViewJoinApplication())
+        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(profileStreamDesc, profiles)
+        .addOutputStream(outputStreamDesc, 1)
+        .run(Duration.ofSeconds(2));
+
+    Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
+  }
+
+  @Test
   public void testHighLevelApi() throws Exception {
     Random random = new Random();
     int count = 10;
@@ -92,6 +126,29 @@ public class StreamApplicationIntegrationTest {
         .run(Duration.ofMillis(1000));
   }
 
+  private static class PageViewProfileViewJoinApplication implements StreamApplication {
+    @Override
+    public void describe(StreamApplicationDescriptor appDesc) {
+      Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(
+          new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
+              KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
+
+      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+      KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+      appDesc.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
+
+      KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
+          ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+      OutputStream<TestTableData.EnrichedPageView> outputStream = appDesc.getOutputStream(enrichedPageViewOSD);
+      appDesc.getInputStream(pageViewISD)
+          .partitionBy(TestTableData.PageView::getMemberId,  pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
+              TestTableData.PageView.class)), "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sendTo(outputStream);
+    }
+  }
+
   private static class PageViewFilterApplication implements StreamApplication {
     @Override
     public void describe(StreamApplicationDescriptor appDesc) {

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 55021d3..f778704 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -27,19 +27,68 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.table.TestTableData;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 
 
 public class StreamTaskIntegrationTest {
 
   @Test
+  public void testStatefulTaskWithLocalTable() {
+    List<PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
+    List<Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
+        .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+
+    InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
+        .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+        .withBootstrap(true);
+
+    InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
+        .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+
+    TestRunner
+        .of(new JoinTaskApplication())
+        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(profileStreamDesc, profiles)
+        .addOutputStream(outputStreamDesc, 1)
+        .run(Duration.ofSeconds(2));
+
+    Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
+  }
+
+  @Test
   public void testSyncTaskWithSinglePartition() throws Exception {
     List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
@@ -155,6 +204,49 @@ public class StreamTaskIntegrationTest {
     StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
   }
 
+  static public class JoinTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDesc) {
+      appDesc.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
+      appDesc.addTable(new InMemoryTableDescriptor("profile-view-store",
+          KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
+      KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+      KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+      appDesc.addInputStream(profileISD);
+      KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.addInputStream(pageViewISD);
+      KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
+          ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+      appDesc.addOutputStream(enrichedPageViewOSD);
+    }
+  }
+
+  static public class StatefulStreamTask implements StreamTask, InitableTask {
+    private ReadWriteTable<Integer, Profile> profileViewTable;
+
+    @Override
+    public void init(Context context) throws Exception {
+      profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+    }
+
+    @Override
+    public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+      if (message.getMessage() instanceof Profile) {
+        Profile profile = (Profile) message.getMessage();
+        profileViewTable.put(profile.getMemberId(), profile);
+      } else if (message.getMessage() instanceof PageView) {
+        PageView pageView = (PageView) message.getMessage();
+        Profile profile = profileViewTable.get(pageView.getMemberId());
+        if (profile != null) {
+          System.out.println("Joining Page View ArticleView by " + profile.getMemberId());
+          collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "EnrichedPageView"), null, null,
+              new TestTableData.EnrichedPageView(pageView.getPageKey(), pageView.getMemberId(), profile.getCompany())));
+        }
+      }
+    }
+
+  }
+
   public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
     List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
@@ -167,4 +259,5 @@ public class StreamTaskIntegrationTest {
       expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
     }
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
index d253284..6b7bded 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
@@ -27,7 +27,7 @@ import org.apache.samza.test.table.TestTableData.Profile;
 /**
  * A {@link StreamTableJoinFunction} used by unit tests in this package
  */
-class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+public class PageViewToProfileJoinFunction implements StreamTableJoinFunction
     <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 2fa00fe..4410b87 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -20,10 +20,15 @@
 package org.apache.samza.test.table;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.KV;
@@ -43,14 +48,6 @@ import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Test;
 
-import java.nio.file.FileSystems;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
@@ -87,11 +84,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     Map<String, String> configs = new HashMap<>();
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
-    configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
-        FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
-    // SideInput Tables needs this to be configured for persisting data
-    configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
-        FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);