You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:25 UTC
[2/5] samza git commit: SAMZA-1714: Creating shared context factory
for shared context objects
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
index efe1acf..f587885 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -19,13 +19,12 @@
package org.apache.samza.table.remote;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.samza.container.SamzaContainerContext;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
@@ -34,18 +33,22 @@ import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class TestRemoteTableDescriptor {
@@ -117,16 +120,24 @@ public class TestRemoteTableDescriptor {
desc.getTableSpec();
}
- private TaskContext createMockTaskContext() {
+ private Context createMockContext() {
+ Context context = new MockContext();
+
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
- TaskContext taskContext = mock(TaskContext.class);
- doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
- SamzaContainerContext containerCtx = new SamzaContainerContext(
- "1", null, Collections.singleton(new TaskName("MyTask")), null);
- doReturn(containerCtx).when(taskContext).getSamzaContainerContext();
- return taskContext;
+ doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+
+ TaskName taskName = new TaskName("MyTask");
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(taskName);
+ when(context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
+ ContainerModel containerModel = mock(ContainerModel.class);
+ when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
+ when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+
+ return context;
}
static class CountingCreditFunction<K, V> implements TableRateLimiter.CreditFunction<K, V> {
@@ -172,7 +183,7 @@ public class TestRemoteTableDescriptor {
TableSpec spec = desc.getTableSpec();
RemoteTableProvider provider = new RemoteTableProvider(spec);
- provider.init(mock(SamzaContainerContext.class), createMockTaskContext());
+ provider.init(createMockContext());
Table table = provider.getTable();
Assert.assertTrue(table instanceof RemoteReadWriteTable);
RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 9dd5a74..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -29,19 +29,16 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.samza.container.SamzaContainerContext;
+import junit.framework.Assert;
+import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.Table;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
import org.apache.samza.table.remote.TestRemoteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
import org.junit.Test;
-import junit.framework.Assert;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
@@ -57,9 +54,8 @@ public class TestRetriableTableFunctions {
public TableMetricsUtil getMetricsUtil(String tableId) {
Table table = mock(Table.class);
- SamzaContainerContext cntCtx = mock(SamzaContainerContext.class);
- TaskContext taskCtx = TestRemoteTable.getMockTaskContext();
- return new TableMetricsUtil(cntCtx, taskCtx, table, tableId);
+ Context context = TestRemoteTable.getMockContext();
+ return new TableMetricsUtil(context, table, tableId);
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
index 1f71abd..13ce5f4 100644
--- a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
@@ -20,19 +20,21 @@
package org.apache.samza.task;
import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
-public class IdentityStreamTask implements StreamTask , InitableTask {
+public class IdentityStreamTask implements StreamTask, InitableTask {
private int processedMessageCount = 0;
private int expectedMessageCount;
private String outputTopic;
private String outputSystem;
@Override
- public void init(Config config, TaskContext taskContext) throws Exception {
+ public void init(Context context) throws Exception {
+ Config config = context.getJobContext().getConfig();
this.expectedMessageCount = config.getInt("app.messageCount");
this.outputTopic = config.get("app.outputTopic", "output");
this.outputSystem = config.get("app.outputSystem", "test-system");
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 9cdbfe6..be8d344 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -28,17 +28,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.OffsetManager;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceExceptionHandler;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
@@ -47,13 +47,20 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.TestSystemConsumers;
import org.junit.Rule;
import org.junit.Test;
-
import org.junit.rules.Timeout;
import scala.Option;
import scala.collection.JavaConverters;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestAsyncRunLoop {
// Immutable objects shared by all test methods.
@@ -77,12 +84,31 @@ public class TestAsyncRunLoop {
private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(taskName);
TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap());
scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
- return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
- null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
- manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics,
- new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null);
+ return new TaskInstance(task,
+ taskModel,
+ taskInstanceMetrics,
+ null,
+ consumers,
+ mock(TaskInstanceCollector.class),
+ manager,
+ null,
+ null,
+ null,
+ sspSet,
+ new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()),
+ null,
+ null,
+ null,
+ new scala.collection.immutable.HashSet<>(),
+ null,
+ mock(JobContext.class),
+ mock(ContainerContext.class),
+ Option.apply(null),
+ Option.apply(null));
}
interface TestCode {
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
index d0b820a..0538980 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +64,7 @@ public class TestAsyncStreamAdapter {
}
@Override
- public void init(Config config, TaskContext context) throws Exception {
+ public void init(Context context) throws Exception {
inited = true;
}
@@ -95,7 +95,7 @@ public class TestAsyncStreamAdapter {
TestCallbackListener listener = new TestCallbackListener();
TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L);
- taskAdaptor.init(null, null);
+ taskAdaptor.init(null);
assertTrue(task.inited);
taskAdaptor.processAsync(null, null, null, callback);
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
index e0da2e9..da137e6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
@@ -19,12 +19,11 @@
package org.apache.samza.task;
-import org.junit.Test;
-
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 1bc23d4..ab5e295 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -19,14 +19,17 @@
package org.apache.samza.task;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.util.Clock;
import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestStreamOperatorTask {
@@ -36,20 +39,19 @@ public class TestStreamOperatorTask {
}
@Test
- public void testCloseDuringInitializationErrors() {
- ContextManager mockContextManager = mock(ContextManager.class);
- StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mockContextManager);
-
- doThrow(new RuntimeException("Failed to initialize context manager"))
- .when(mockContextManager).init(any(), any());
-
+ public void testCloseDuringInitializationErrors() throws Exception {
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ doThrow(new RuntimeException("Failed to get config")).when(jobContext).getConfig();
+ StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mock(Clock.class));
try {
- operatorTask.init(mock(Config.class), mock(TaskContext.class));
- operatorTask.close();
- } catch (Exception e) {
+ operatorTask.init(context);
+ } catch (RuntimeException e) {
if (e instanceof NullPointerException) {
fail("Unexpected null pointer exception");
}
}
+ operatorTask.close();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
index ef606c0..8559bb3 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
@@ -18,19 +18,22 @@
*/
package org.apache.samza.util;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -206,25 +209,14 @@ public class TestEmbeddedTaggedRateLimiter {
}
static void initRateLimiter(RateLimiter rateLimiter) {
- Config config = mock(Config.class);
- TaskContext taskContext = mock(TaskContext.class);
- SamzaContainerContext containerContext = mockSamzaContainerContext();
- when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
- rateLimiter.init(config, taskContext);
- }
-
- static SamzaContainerContext mockSamzaContainerContext() {
- try {
- Collection<String> taskNames = mock(Collection.class);
- when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
- Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
- taskNamesField.setAccessible(true);
- taskNamesField.set(containerContext, taskNames);
- taskNamesField.setAccessible(false);
- return containerContext;
- } catch (Exception ex) {
- throw new SamzaException(ex);
- }
+ Context context = new MockContext(mock(Config.class));
+ when(context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
+ ContainerModel containerModel = mock(ContainerModel.class);
+ Map<TaskName, TaskModel> tasks = IntStream.range(0, NUMBER_OF_TASKS)
+ .mapToObj(i -> new TaskName("task-" + i))
+ .collect(Collectors.toMap(Function.identity(), x -> mock(TaskModel.class)));
+ when(containerModel.getTasks()).thenReturn(tasks);
+ when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+ rateLimiter.init(context);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 57c0bf0..a35366d 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -22,7 +22,8 @@ package org.apache.samza.container
import java.util
import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{Config, MapConfig}
+import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
@@ -46,7 +47,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
private val TASK_NAME = new TaskName("taskName")
@Mock
- private var containerContext: SamzaContainerContext = null
+ private var config: Config = null
@Mock
private var taskInstance: TaskInstance = null
@Mock
@@ -60,6 +61,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@Mock
private var metrics: SamzaContainerMetrics = null
@Mock
+ private var containerContext: ContainerContext = null
+ @Mock
+ private var applicationContainerContext: ApplicationContainerContext = null
+ @Mock
private var samzaContainerListener: SamzaContainerListener = null
private var samzaContainer: SamzaContainer = null
@@ -67,15 +72,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@Before
def setup(): Unit = {
MockitoAnnotations.initMocks(this)
- this.samzaContainer = new SamzaContainer(
- this.containerContext,
- Map(TASK_NAME -> this.taskInstance),
- this.runLoop,
- this.systemAdmins,
- this.consumerMultiplexer,
- this.producerMultiplexer,
- metrics)
- this.samzaContainer.setContainerListener(this.samzaContainerListener)
+ setupSamzaContainer(Some(this.applicationContainerContext))
when(this.metrics.containerStartupTime).thenReturn(mock[Timer])
}
@@ -173,6 +170,24 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
}
@Test
+ def testApplicationContainerContext() {
+ val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop)
+ this.samzaContainer.run
+ orderVerifier.verify(this.applicationContainerContext).start()
+ orderVerifier.verify(this.runLoop).run()
+ orderVerifier.verify(this.applicationContainerContext).stop()
+ }
+
+ @Test
+ def testNullApplicationContainerContextFactory() {
+ setupSamzaContainer(None)
+ this.samzaContainer.run
+ verify(this.runLoop).run()
+ // applicationContainerContext is not even wired into the container anymore, but just double check it is not used
+ verifyZeroInteractions(this.applicationContainerContext)
+ }
+
+ @Test
def testReadJobModel() {
val config = new MapConfig(Map("a" -> "b").asJava)
val offsets = new util.HashMap[SystemStreamPartition, String]()
@@ -258,6 +273,20 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
assertEquals(Set(), SamzaContainer.getChangelogSSPsForContainer(containerModel, Map()))
}
+ private def setupSamzaContainer(applicationContainerContext: Option[ApplicationContainerContext]) {
+ this.samzaContainer = new SamzaContainer(
+ this.config,
+ Map(TASK_NAME -> this.taskInstance),
+ this.runLoop,
+ this.systemAdmins,
+ this.consumerMultiplexer,
+ this.producerMultiplexer,
+ metrics,
+ containerContext = this.containerContext,
+ applicationContainerContextOption = applicationContainerContext)
+ this.samzaContainer.setContainerListener(this.samzaContainerListener)
+ }
+
class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
var exceptionCount = 0
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index b196131..15534cd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -22,7 +22,8 @@ package org.apache.samza.container
import org.apache.samza.Partition
import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
-import org.apache.samza.config.Config
+import org.apache.samza.context.{TaskContext => _, _}
+import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.Counter
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemConsumers, SystemStream, _}
@@ -34,11 +35,12 @@ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{Matchers, Mock, MockitoAnnotations}
+import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
-class TestTaskInstance extends MockitoSugar {
+class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
private val SYSTEM_NAME = "test-system"
private val TASK_NAME = new TaskName("taskName")
private val SYSTEM_STREAM_PARTITION =
@@ -48,7 +50,7 @@ class TestTaskInstance extends MockitoSugar {
@Mock
private var task: AllTask = null
@Mock
- private var config: Config = null
+ private var taskModel: TaskModel = null
@Mock
private var metrics: TaskInstanceMetrics = null
@Mock
@@ -60,13 +62,21 @@ class TestTaskInstance extends MockitoSugar {
@Mock
private var collector: TaskInstanceCollector = null
@Mock
- private var containerContext: SamzaContainerContext = null
- @Mock
private var offsetManager: OffsetManager = null
@Mock
private var taskStorageManager: TaskStorageManager = null
// not a mock; using MockTaskInstanceExceptionHandler
private var taskInstanceExceptionHandler: MockTaskInstanceExceptionHandler = null
+ @Mock
+ private var jobContext: JobContext = null
+ @Mock
+ private var containerContext: ContainerContext = null
+ @Mock
+ private var applicationContainerContext: ApplicationContainerContext = null
+ @Mock
+ private var applicationTaskContextFactory: ApplicationTaskContextFactory[ApplicationTaskContext] = null
+ @Mock
+ private var applicationTaskContext: ApplicationTaskContext = null
private var taskInstance: TaskInstance = null
@@ -75,19 +85,12 @@ class TestTaskInstance extends MockitoSugar {
MockitoAnnotations.initMocks(this)
// not using Mockito mock since Mockito doesn't work well with the call-by-name argument in maybeHandle
this.taskInstanceExceptionHandler = new MockTaskInstanceExceptionHandler
- this.taskInstance = new TaskInstance(this.task,
- TASK_NAME,
- this.config,
- this.metrics,
- this.systemAdmins,
- this.consumerMultiplexer,
- this.collector,
- this.containerContext,
- this.offsetManager,
- storageManager = this.taskStorageManager,
- systemStreamPartitions = SYSTEM_STREAM_PARTITIONS,
- exceptionHandler = this.taskInstanceExceptionHandler)
+ when(this.taskModel.getTaskName).thenReturn(TASK_NAME)
+ when(this.applicationTaskContextFactory.create(Matchers.eq(this.jobContext), Matchers.eq(this.containerContext),
+ any(), Matchers.eq(this.applicationContainerContext)))
+ .thenReturn(this.applicationTaskContext)
when(this.systemAdmins.getSystemAdmin(SYSTEM_NAME)).thenReturn(this.systemAdmin)
+ setupTaskInstance(Some(this.applicationTaskContextFactory))
}
@Test
@@ -133,10 +136,10 @@ class TestTaskInstance extends MockitoSugar {
*/
@Test
def testManualOffsetReset() {
- when(this.task.init(any(), any())).thenAnswer(new Answer[Void] {
+ when(this.task.init(any())).thenAnswer(new Answer[Void] {
override def answer(invocation: InvocationOnMock): Void = {
- val taskContext = invocation.getArgumentAt(1, classOf[TaskContext])
- taskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10")
+ val context = invocation.getArgumentAt(0, classOf[Context])
+ context.getTaskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10")
null
}
})
@@ -198,6 +201,35 @@ class TestTaskInstance extends MockitoSugar {
verify(commitsCounter).inc()
}
+ /**
+ * Given that an application task context factory is provided, then lifecycle calls should be made and the context
+ * should be accessible.
+ */
+ @Test
+ def testApplicationTaskContextFactoryProvided(): Unit = {
+ assertEquals(this.applicationTaskContext, this.taskInstance.context.getApplicationTaskContext)
+ this.taskInstance.initTask
+ verify(this.applicationTaskContext).start()
+ verify(this.applicationTaskContext, never()).stop()
+ this.taskInstance.shutdownTask
+ verify(this.applicationTaskContext).stop()
+ }
+
+ /**
+ * Given that no application task context factory is provided, then no lifecycle calls should be made. Also, an
+ * exception should be thrown if the application task context is accessed.
+ */
+ @Test
+ def testNoApplicationTaskContextFactoryProvided() {
+ setupTaskInstance(None)
+ this.taskInstance.initTask
+ this.taskInstance.shutdownTask
+ verifyZeroInteractions(this.applicationTaskContext)
+ intercept[IllegalStateException] {
+ this.taskInstance.context.getApplicationTaskContext
+ }
+ }
+
@Test(expected = classOf[SystemProducerException])
def testProducerExceptionsIsPropagated() {
when(this.metrics.commits).thenReturn(mock[Counter])
@@ -210,6 +242,24 @@ class TestTaskInstance extends MockitoSugar {
}
}
+ private def setupTaskInstance(
+ applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
+ this.taskInstance = new TaskInstance(this.task,
+ this.taskModel,
+ this.metrics,
+ this.systemAdmins,
+ this.consumerMultiplexer,
+ this.collector,
+ offsetManager = this.offsetManager,
+ storageManager = this.taskStorageManager,
+ systemStreamPartitions = SYSTEM_STREAM_PARTITIONS,
+ exceptionHandler = this.taskInstanceExceptionHandler,
+ jobContext = this.jobContext,
+ containerContext = this.containerContext,
+ applicationContainerContextOption = Some(this.applicationContainerContext),
+ applicationTaskContextFactoryOption = applicationTaskContextFactory)
+ }
+
/**
* Task type which has all task traits, which can be mocked.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 0b951f4..59f8662 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -18,21 +18,25 @@
*/
package org.apache.samza.processor
-import java.util.Collections
+import java.util
+import org.apache.samza.Partition
import org.apache.samza.config.MapConfig
import org.apache.samza.container._
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.context.{ContainerContext, JobContext}
+import org.apache.samza.job.model.TaskModel
import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.system._
+import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
+import org.mockito.Mockito
object StreamProcessorTestUtils {
def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = {
- val config = new MapConfig
+ val config = new MapConfig()
val taskName = new TaskName("taskName")
+ val taskModel = new TaskModel(taskName, new util.HashSet[SystemStreamPartition](), new Partition(0))
val adminMultiplexer = new SystemAdmins(config)
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
@@ -41,26 +45,29 @@ object StreamProcessorTestUtils {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName), new MetricsRegistryMap)
+ val containerContext = Mockito.mock(classOf[ContainerContext])
val taskInstance: TaskInstance = new TaskInstance(
streamTask,
- taskName,
- config,
+ taskModel,
new TaskInstanceMetrics,
- null,
+ adminMultiplexer,
consumerMultiplexer,
collector,
- containerContext
- )
+ jobContext = Mockito.mock(classOf[JobContext]),
+ containerContext = containerContext,
+ applicationContainerContextOption = None,
+ applicationTaskContextFactoryOption = None)
val container = new SamzaContainer(
- containerContext = containerContext,
+ config = config,
taskInstances = Map(taskName -> taskInstance),
runLoop = mockRunloop,
systemAdmins = adminMultiplexer,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
- metrics = new SamzaContainerMetrics)
+ metrics = new SamzaContainerMetrics,
+ containerContext = containerContext,
+ applicationContainerContextOption = None)
container
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
index 53147ad..e30328a 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
@@ -21,18 +21,19 @@ package org.apache.samza.storage.kv.inmemory
import java.io.File
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.context.{ContainerContext, JobContext}
import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.storage.kv.{KeyValueStoreMetrics, BaseKeyValueStorageEngineFactory, KeyValueStore}
+import org.apache.samza.storage.kv.{BaseKeyValueStorageEngineFactory, KeyValueStore, KeyValueStoreMetrics}
import org.apache.samza.system.SystemStreamPartition
class InMemoryKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
override def getKVStore(storeName: String,
- storeDir: File,
- registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
- containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ jobContext: JobContext,
+ containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
val metrics = new KeyValueStoreMetrics(storeName, registry)
val inMemoryDb = new InMemoryKeyValueStore (metrics)
inMemoryDb
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 9dca23c..0734fe6 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -19,16 +19,11 @@
package org.apache.samza.storage.kv;
-import java.util.ArrayList;
-
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSerializerConfig;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.SerializerConfig$;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.util.Util;
@@ -65,11 +60,7 @@ public class RocksDbKeyValueReader {
valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName), serializerConfig);
// get db options
- ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
- taskNameList.add(new TaskName("read-rocks-db"));
- SamzaContainerContext samzaContainerContext =
- new SamzaContainerContext("0", config, taskNameList, new MetricsRegistryMap());
- Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
+ Options options = RocksDbOptionsHelper.options(config, 1);
// open the db
RocksDB.loadLibrary();
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 9389681..7beb066 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -20,7 +20,8 @@
package org.apache.samza.storage.kv;
import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@@ -41,12 +42,11 @@ public class RocksDbOptionsHelper {
private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
- public static Options options(Config storeConfig, SamzaContainerContext containerContext) {
+ public static Options options(Config storeConfig, int numTasksForContainer) {
Options options = new Options();
Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
// Cache size and write buffer size are specified on a per-container basis.
- int numTasks = containerContext.taskNames.size();
- options.setWriteBufferSize((int) (writeBufSize / numTasks));
+ options.setWriteBufferSize((int) (writeBufSize / numTasksForContainer));
CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy");
@@ -75,7 +75,7 @@ public class RocksDbOptionsHelper {
}
options.setCompressionType(compressionType);
- long blockCacheSize = getBlockCacheSize(storeConfig, containerContext);
+ long blockCacheSize = getBlockCacheSize(storeConfig, numTasksForContainer);
int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize);
@@ -109,9 +109,8 @@ public class RocksDbOptionsHelper {
return options;
}
- public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) {
- int numTasks = containerContext.taskNames.size();
+ public static Long getBlockCacheSize(Config storeConfig, int numTasksForContainer) {
long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
- return cacheSize / numTasks;
+ return cacheSize / numTasksForContainer;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index 2b7ffb5..704af4a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -20,11 +20,12 @@
package org.apache.samza.storage.kv
import java.io.File
-import org.apache.samza.container.SamzaContainerContext
+
+import org.apache.samza.config.StorageConfig._
+import org.apache.samza.context.{ContainerContext, JobContext}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamPartition
import org.rocksdb.{FlushOptions, WriteOptions}
-import org.apache.samza.config.StorageConfig._
class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
/**
@@ -37,17 +38,19 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
* @return A valid KeyValueStore instance
*/
override def getKVStore(storeName: String,
- storeDir: File,
- registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
- containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
- val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
- val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ jobContext: JobContext,
+ containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+ val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true)
+ val isLoggedStore = jobContext.getConfig.getChangelogStream(storeName).isDefined
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+ val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size()
rocksDbMetrics.newGauge("rocksdb.block-cache-size",
- () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext))
+ () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, numTasksForContainer))
- val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
+ val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, numTasksForContainer)
val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
val rocksDb = new RocksDbKeyValueStore(
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
index 35a66e8..cd7e85c 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
@@ -18,14 +18,13 @@
*/
package org.apache.samza.storage.kv;
+import junit.framework.Assert;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.table.TableSpec;
import org.junit.Test;
-import junit.framework.Assert;
-
public class TestRocksDbTableDescriptor {
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 8231905..e56c977 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -18,11 +18,11 @@
*/
package org.apache.samza.storage.kv;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -31,15 +31,12 @@ import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.utils.BaseTableProvider;
import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.task.TaskContext;
-
-import com.google.common.base.Preconditions;
/**
@@ -59,13 +56,12 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide
}
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-
- super.init(containerContext, taskContext);
+ public void init(Context context) {
+ super.init(context);
- Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables.");
+ Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
- kvStore = (KeyValueStore) taskContext.getStore(tableSpec.getId());
+ kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
if (kvStore == null) {
throw new SamzaException(String.format(
@@ -81,7 +77,7 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide
throw new SamzaException("Store not initialized for table " + tableSpec.getId());
}
ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
- table.init(containerContext, taskContext);
+ table.init(this.context);
return table;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 9eeb55e..804df43 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -20,11 +20,9 @@ package org.apache.samza.storage.kv;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.DefaultTableWriteMetrics;
-import org.apache.samza.task.TaskContext;
/**
@@ -51,9 +49,9 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- super.init(containerContext, taskContext);
- writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ super.init(context);
+ writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
index d0629c4..d440d42 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
@@ -18,15 +18,13 @@
*/
package org.apache.samza.storage.kv;
+import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
-import com.google.common.base.Preconditions;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.utils.DefaultTableReadMetrics;
-import org.apache.samza.task.TaskContext;
/**
@@ -58,8 +56,8 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V>
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ readMetrics = new DefaultTableReadMetrics(context, this, tableId);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index da80560..d962e93 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -22,14 +22,14 @@ package org.apache.samza.storage.kv
import java.io.File
import org.apache.samza.SamzaException
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.context.{ContainerContext, JobContext}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.Serde
import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties}
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.MessageCollector
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
+import org.apache.samza.util.HighResolutionClock
/**
* A key value storage engine factory implementation
@@ -52,11 +52,12 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
* @param containerContext Information about the container in which the task is executing.
* @return A valid KeyValueStore instance
*/
- def getKVStore( storeName: String,
- storeDir: File,
- registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
- containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
+ def getKVStore(storeName: String,
+ storeDir: File,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ jobContext: JobContext,
+ containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
/**
* Constructs a key-value StorageEngine and returns it to the caller
@@ -70,15 +71,16 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
* @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
* @param containerContext Information about the container in which the task is executing.
**/
- def getStorageEngine( storeName: String,
- storeDir: File,
- keySerde: Serde[K],
- msgSerde: Serde[V],
- collector: MessageCollector,
- registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
- containerContext: SamzaContainerContext): StorageEngine = {
- val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+ def getStorageEngine(storeName: String,
+ storeDir: File,
+ keySerde: Serde[K],
+ msgSerde: Serde[V],
+ collector: MessageCollector,
+ registry: MetricsRegistry,
+ changeLogSystemStreamPartition: SystemStreamPartition,
+ jobContext: JobContext,
+ containerContext: ContainerContext): StorageEngine = {
+ val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true)
val storeFactory = storageConfig.get("factory")
var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
val accessLog = storageConfig.getBoolean("accesslog.enabled", false)
@@ -106,7 +108,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
throw new SamzaException("Must define a message serde when using key value storage.")
}
- val rawStore = getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+ val rawStore =
+ getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, jobContext, containerContext)
// maybe wrap with logging
val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
@@ -141,7 +144,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
// create the storage engine and return
// TODO: Decide if we should use raw bytes when restoring
val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
- val clock = if (containerContext.config.getMetricsTimerEnabled) {
+ val clock = if (jobContext.getConfig.getMetricsTimerEnabled) {
new HighResolutionClock {
override def nanoTime(): Long = System.nanoTime()
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
index 2b0166c..399f9fd 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
@@ -28,33 +28,33 @@ import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Test;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestBaseLocalStoreBackedTableProvider {
@Test
public void testInit() {
- StorageEngine store = mock(KeyValueStorageEngine.class);
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+ Context context = mock(Context.class);
TaskContext taskContext = mock(TaskContext.class);
- when(taskContext.getStore(any())).thenReturn(store);
- when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+ when(context.getTaskContext()).thenReturn(taskContext);
+ when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+ when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
TableSpec tableSpec = mock(TableSpec.class);
when(tableSpec.getId()).thenReturn("t1");
TableProvider tableProvider = createTableProvider(tableSpec);
- tableProvider.init(containerContext, taskContext);
+ tableProvider.init(context);
Assert.assertNotNull(tableProvider.getTable());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
new file mode 100644
index 0000000..6841e15
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.runner;
+
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.sql.translator.TranslatorContext;
+
+
+public class SamzaSqlApplicationContext implements ApplicationTaskContext {
+ private final TranslatorContext translatorContext;
+
+ public SamzaSqlApplicationContext(TranslatorContext translatorContext) {
+ this.translatorContext = translatorContext;
+ }
+
+ public TranslatorContext getTranslatorContext() {
+ return translatorContext;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index f33c5ca..77a24f8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -21,14 +21,13 @@ package org.apache.samza.sql.translator;
import java.util.Arrays;
import java.util.Collections;
-
import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +52,8 @@ class FilterTranslator {
}
@Override
- public void init(Config config, TaskContext context) {
- this.context = (TranslatorContext) context.getUserContext();
+ public void init(Context context) {
+ this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
this.filter = (LogicalFilter) this.context.getRelNode(filterId);
this.expr = this.context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index 965338f..435a2cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.TableModify;
import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
@@ -39,8 +39,8 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
/**
@@ -70,9 +70,10 @@ class ModifyTranslator {
}
@Override
- public void init(Config config, TaskContext taskContext) {
- TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
- this.samzaMsgConverter = context.getMsgConverter(outputTopic);
+ public void init(Context context) {
+ TranslatorContext translatorContext =
+ ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
+ this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 8e6f687..9a1ff84 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -31,12 +31,12 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +61,8 @@ class ProjectTranslator {
}
@Override
- public void init(Config config, TaskContext taskContext) {
- this.context = (TranslatorContext) taskContext.getUserContext();
+ public void init(Context context) {
+ this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
this.project = (Project) this.context.getRelNode(projectId);
this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 3a35b97..b13043f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -31,16 +31,17 @@ import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.task.TaskContext;
-
/**
* This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
@@ -72,8 +73,8 @@ public class QueryTranslator {
public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) {
final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
- final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
- final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
+ final TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
+ final SqlIOResolver ioResolver = translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
final RelNode node = relRoot.project();
node.accept(new RelShuttleImpl() {
@@ -93,28 +94,28 @@ public class QueryTranslator {
throw new SamzaException("Not a supported operation: " + modify.toString());
}
RelNode node = super.visit(modify);
- modifyTranslator.translate(modify, context);
+ modifyTranslator.translate(modify, translatorContext);
return node;
}
@Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
- scanTranslator.translate(scan, context);
+ scanTranslator.translate(scan, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalFilter filter) {
RelNode node = visitChild(filter, 0, filter.getInput());
- new FilterTranslator().translate(filter, context);
+ new FilterTranslator().translate(filter, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalProject project) {
RelNode node = super.visit(project);
- new ProjectTranslator().translate(project, context);
+ new ProjectTranslator().translate(project, translatorContext);
return node;
}
@@ -122,7 +123,7 @@ public class QueryTranslator {
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
joinId++;
- new JoinTranslator(joinId, ioResolver).translate(join, context);
+ new JoinTranslator(joinId, ioResolver).translate(join, translatorContext);
return node;
}
@@ -130,23 +131,21 @@ public class QueryTranslator {
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
windowId++;
- new LogicalAggregateTranslator(windowId).translate(aggregate, context);
+ new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext);
return node;
}
});
- appDesc.withContextManager(new ContextManager() {
- @Override
- public void init(Config config, TaskContext taskContext) {
- taskContext.setUserContext(context.clone());
- }
-
- @Override
- public void close() {
-
- }
-
- });
-
+ /*
+ * TODO When serialization of ApplicationDescriptor is actually needed, then something will need to be updated here,
+ * since translatorContext is not Serializable. Currently, a new ApplicationDescriptor instance is created in each
+ * container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container
+ * and does not need to be serialized.
+ */
+ appDesc.withApplicationTaskContextFactory((jobContext,
+ containerContext,
+ taskContext,
+ applicationContainerContext) ->
+ new SamzaSqlApplicationContext(translatorContext.clone()));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 771a5d5..be94160 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.calcite.rel.core.TableScan;
import org.apache.commons.lang.Validate;
import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
@@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
/**
@@ -64,9 +64,10 @@ class ScanTranslator {
}
@Override
- public void init(Config config, TaskContext taskContext) {
- TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
- this.msgConverter = context.getMsgConverter(streamName);
+ public void init(Context context) {
+ TranslatorContext translatorContext =
+ ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
+ this.msgConverter = translatorContext.getMsgConverter(streamName);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index 2005c21..ec0a993 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -22,7 +22,6 @@ package org.apache.samza.sql.e2e;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 1ac804e..f0df3a9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -20,14 +20,12 @@
package org.apache.samza.sql.runner;
import java.util.Map;
-
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
import org.junit.Assert;
-
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 458196f..fd811cd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.samza.config.Config;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 8318e8a..4c78b5a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.BaseTableDescriptor;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index a84f347..dd98b92 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -23,8 +23,6 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
-
-import org.apache.samza.sql.testutil.SqlFileParser;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index e7c2195..07ebe33 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -21,14 +21,11 @@ package org.apache.samza.sql.translator;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.FilterFunction;
@@ -38,6 +35,7 @@ import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
@@ -50,8 +48,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -96,11 +94,9 @@ public class TestFilterTranslator extends TranslatorTestBase {
assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
// Verify that the describe() method will establish the context for the filter function
- Config mockConfig = mock(Config.class);
- TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
- new HashSet<>(), null, null, null, null, null, null);
- taskContext.setUserContext(mockContext);
- filterSpec.getTransformFn().init(mockConfig, taskContext);
+ Context context = mock(Context.class);
+ when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+ filterSpec.getTransformFn().init(context);
FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn");
assertNotNull(filterFn);
assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context"));
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 3046c1f..2ed7a00 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
@@ -33,9 +32,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.calcite.util.Pair;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.MapFunction;
@@ -47,6 +44,7 @@ import org.apache.samza.sql.data.Expression;
import org.apache.samza.sql.data.RexToJavaCompiler;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.Whitebox;
@@ -58,8 +56,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -114,11 +112,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
// Verify that the bootstrap() method will establish the context for the map function
- Config mockConfig = mock(Config.class);
- TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
- new HashSet<>(), null, null, null, null, null, null);
- taskContext.setUserContext(mockContext);
- projectSpec.getTransformFn().init(mockConfig, taskContext);
+ Context context = mock(Context.class);
+ when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+ projectSpec.getTransformFn().init(context);
MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
assertNotNull(mapFn);
assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
@@ -249,11 +245,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
// Verify that the describe() method will establish the context for the map function
- Config mockConfig = mock(Config.class);
- TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
- new HashSet<>(), null, null, null, null, null, null);
- taskContext.setUserContext(mockContext);
- projectSpec.getTransformFn().init(mockConfig, taskContext);
+ Context context = mock(Context.class);
+ when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+ projectSpec.getTransformFn().init(context);
MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
assertNotNull(mapFn);
assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
@@ -285,5 +279,4 @@ public class TestProjectTranslator extends TranslatorTestBase {
}});
}
-
}