You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/11 23:13:48 UTC
samza git commit: SAMZA-1771: Test Framework support for stateful
testing and assertions over state
Repository: samza
Updated Branches:
refs/heads/master cc490ea89 -> 5c5afb82a
SAMZA-1771: Test Framework support for stateful testing and assertions over state
- Gives StateAssert for stateful assertions
- Gives examples of Testing stateful jobs for TaskApplication and StreamApplication
Author: Sanil15 <sa...@gmail.com>
Author: Sanil Jain <sn...@linkedin.com>
Author: Sanil Jain <sa...@gmail.com>
Reviewers: Shanthoosh Venkataraman <sv...@linkedin.com>, Prateek Maheshwari <pm...@apache.org>
Closes #683 from Sanil15/SAMZA-1771
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5c5afb82
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5c5afb82
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5c5afb82
Branch: refs/heads/master
Commit: 5c5afb82ab12c72a4b114b0858f77779285b86cf
Parents: cc490ea
Author: Sanil15 <sa...@gmail.com>
Authored: Thu Oct 11 16:13:45 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 11 16:13:45 2018 -0700
----------------------------------------------------------------------
.../apache/samza/test/framework/TestRunner.java | 45 ++++++++--
.../system/InMemorySystemDescriptor.java | 1 -
.../AsyncStreamTaskIntegrationTest.java | 3 +-
.../StreamApplicationIntegrationTest.java | 61 ++++++++++++-
.../framework/StreamTaskIntegrationTest.java | 93 ++++++++++++++++++++
.../table/PageViewToProfileJoinFunction.java | 2 +-
.../table/TestLocalTableWithSideInputs.java | 20 ++---
7 files changed, 198 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index add3bf6..a3d8a0e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -20,6 +20,7 @@
package org.apache.samza.test.framework;
import com.google.common.base.Preconditions;
+import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,7 +30,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
@@ -61,7 +61,10 @@ import org.apache.samza.task.StreamTask;
import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
-import org.junit.Assert;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* TestRunner provides APIs to set up integration tests for a Samza application.
@@ -80,6 +83,7 @@ import org.junit.Assert;
*
*/
public class TestRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
private static final String JOB_NAME = "samza-test";
@@ -98,8 +102,14 @@ public class TestRunner {
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+ // Changing the base directory for non-changelog stores used by Samza application to separate the
+ // on-disk store locations for concurrently executing tests
+ configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
+ new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath());
+ configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
+ new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope).getAbsolutePath());
addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
- // This is important because Table Api enables host affinity by default for RocksDb
+ // Disabling host affinity since it requires reading locality information from a Kafka coordinator stream
addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
@@ -251,16 +261,19 @@ public class TestRunner {
* @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode
*/
public void run(Duration timeout) {
- Preconditions.checkState(app != null,
- "TestRunner should run for Low Level Task api or High Level Application Api");
+ Preconditions.checkNotNull(app);
Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive");
+ // Cleaning store directories to ensure current run does not pick up state from previous run
+ deleteStoreDirectories();
final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs));
runner.run();
- boolean timedOut = !runner.waitForFinish(timeout);
- Assert.assertFalse("Timed out waiting for application to finish", timedOut);
+ if (!runner.waitForFinish(timeout)) {
+ throw new SamzaException("Timed out waiting for application to finish");
+ }
ApplicationStatus status = runner.status();
+ deleteStoreDirectories();
if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
- throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
+ throw new SamzaException("Application could not finish successfully", status.getThrowable());
}
}
@@ -369,4 +382,20 @@ public class TestRunner {
new EndOfStreamMessage(null)));
});
}
+
+ private void deleteStoreDirectories() {
+ Preconditions.checkNotNull(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
+ Preconditions.checkNotNull(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
+ deleteDirectory(configs.get(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR()));
+ deleteDirectory(configs.get(JobConfig.JOB_LOGGED_STORE_BASE_DIR()));
+ }
+
+ private void deleteDirectory(String path) {
+ File dir = new File(path);
+ LOG.info("Deleting the directory " + path);
+ FileUtil.rm(dir);
+ if (dir.exists()) {
+ LOG.warn("Could not delete the directory " + path);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
index e6e423f..77948f6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
@@ -30,7 +30,6 @@ import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.config.JavaSystemConfig;
-
/**
* A descriptor for InMemorySystem.
* System properties configured using a descriptor override corresponding properties provided in configuration.
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index f1757ab..7696b62 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
@@ -145,7 +146,7 @@ public class AsyncStreamTaskIntegrationTest {
/**
* Job should fail because it times out too soon
*/
- @Test(expected = AssertionError.class)
+ @Test(expected = SamzaException.class)
public void testSamzaJobTimeoutFailureForAsyncTask() {
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 7b9bad7..1dda302 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.samza.test.framework;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.samza.SamzaException;
@@ -27,27 +28,60 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.table.Table;
import org.apache.samza.test.controlmessages.TestData;
import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.table.PageViewToProfileJoinFunction;
+import org.apache.samza.test.table.TestTableData;
import org.junit.Assert;
import org.junit.Test;
-
-import static org.apache.samza.test.controlmessages.TestData.PageView;
+import static org.apache.samza.test.controlmessages.TestData.*;
public class StreamApplicationIntegrationTest {
+
private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
@Test
+ public void testStatefulJoinWithLocalTable() {
+ List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
+ List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
+ .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+
+ InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
+ .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+ .withBootstrap(true);
+
+ InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
+ .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+
+ TestRunner
+ .of(new PageViewProfileViewJoinApplication())
+ .addInputStream(pageViewStreamDesc, pageViews)
+ .addInputStream(profileStreamDesc, profiles)
+ .addOutputStream(outputStreamDesc, 1)
+ .run(Duration.ofSeconds(2));
+
+ Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
+ }
+
+ @Test
public void testHighLevelApi() throws Exception {
Random random = new Random();
int count = 10;
@@ -92,6 +126,29 @@ public class StreamApplicationIntegrationTest {
.run(Duration.ofMillis(1000));
}
+ private static class PageViewProfileViewJoinApplication implements StreamApplication {
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(
+ new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
+ KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
+
+ KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+ KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ appDesc.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
+
+ KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
+ ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+ OutputStream<TestTableData.EnrichedPageView> outputStream = appDesc.getOutputStream(enrichedPageViewOSD);
+ appDesc.getInputStream(pageViewISD)
+ .partitionBy(TestTableData.PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
+ TestTableData.PageView.class)), "p1")
+ .join(table, new PageViewToProfileJoinFunction())
+ .sendTo(outputStream);
+ }
+ }
+
private static class PageViewFilterApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDesc) {
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 55021d3..f778704 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -27,19 +27,68 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
+import org.apache.samza.test.table.TestTableData;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
public class StreamTaskIntegrationTest {
@Test
+ public void testStatefulTaskWithLocalTable() {
+ List<PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
+ List<Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
+ .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+
+ InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
+ .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+ .withBootstrap(true);
+
+ InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
+ .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+
+ TestRunner
+ .of(new JoinTaskApplication())
+ .addInputStream(pageViewStreamDesc, pageViews)
+ .addInputStream(profileStreamDesc, profiles)
+ .addOutputStream(outputStreamDesc, 1)
+ .run(Duration.ofSeconds(2));
+
+ Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, Duration.ofSeconds(1)).get(0).size());
+ }
+
+ @Test
public void testSyncTaskWithSinglePartition() throws Exception {
List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
@@ -155,6 +204,49 @@ public class StreamTaskIntegrationTest {
StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
}
+ static public class JoinTaskApplication implements TaskApplication {
+ @Override
+ public void describe(TaskApplicationDescriptor appDesc) {
+ appDesc.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
+ appDesc.addTable(new InMemoryTableDescriptor("profile-view-store",
+ KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
+ KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
+ KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ appDesc.addInputStream(profileISD);
+ KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDesc.addInputStream(pageViewISD);
+ KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
+ ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+ appDesc.addOutputStream(enrichedPageViewOSD);
+ }
+ }
+
+ static public class StatefulStreamTask implements StreamTask, InitableTask {
+ private ReadWriteTable<Integer, Profile> profileViewTable;
+
+ @Override
+ public void init(Context context) throws Exception {
+ profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+ }
+
+ @Override
+ public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+ if (message.getMessage() instanceof Profile) {
+ Profile profile = (Profile) message.getMessage();
+ profileViewTable.put(profile.getMemberId(), profile);
+ } else if (message.getMessage() instanceof PageView) {
+ PageView pageView = (PageView) message.getMessage();
+ Profile profile = profileViewTable.get(pageView.getMemberId());
+ if (profile != null) {
+ System.out.println("Joining Page View ArticleView by " + profile.getMemberId());
+ collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "EnrichedPageView"), null, null,
+ new TestTableData.EnrichedPageView(pageView.getPageKey(), pageView.getMemberId(), profile.getCompany())));
+ }
+ }
+ }
+
+ }
+
public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
@@ -167,4 +259,5 @@ public class StreamTaskIntegrationTest {
expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
}
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
index d253284..6b7bded 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
@@ -27,7 +27,7 @@ import org.apache.samza.test.table.TestTableData.Profile;
/**
* A {@link StreamTableJoinFunction} used by unit tests in this package
*/
-class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+public class PageViewToProfileJoinFunction implements StreamTableJoinFunction
<Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5c5afb82/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 2fa00fe..4410b87 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -20,10 +20,15 @@
package org.apache.samza.test.table;
import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.KV;
@@ -43,14 +48,6 @@ import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.junit.Test;
-import java.nio.file.FileSystems;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
@@ -87,11 +84,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
Map<String, String> configs = new HashMap<>();
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
- configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
- FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
- // SideInput Tables needs this to be configured for persisting data
- configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
- FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);