You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:27 UTC
[4/5] samza git commit: SAMZA-1714: Creating shared context factory
for shared context objects
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 63e269d..88644ce 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -18,8 +18,8 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -30,7 +30,6 @@ import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import java.util.Collection;
@@ -50,20 +49,20 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
private final ControlMessageSender controlMessageSender;
PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
- SystemStream systemStream, TaskContext context) {
+ SystemStream systemStream, Context context) {
this.partitionByOpSpec = partitionByOpSpec;
this.systemStream = systemStream;
this.keyFunction = partitionByOpSpec.getKeyFunction();
this.valueFunction = partitionByOpSpec.getValueFunction();
- this.taskName = context.getTaskName().getTaskName();
- StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
+ this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
+ StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context.getTaskContext()).getStreamMetadataCache();
this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
}
@Override
- protected void handleInit(Config config, TaskContext context) {
- this.keyFunction.init(config, context);
- this.valueFunction.init(config, context);
+ protected void handleInit(Context context) {
+ this.keyFunction.init(context);
+ this.valueFunction.init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 5ce1328..be3e0a3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -18,18 +18,17 @@
*/
package org.apache.samza.operators.impl;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* Implementation of a send-stream-to-table operator that stores the record
@@ -43,13 +42,13 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
private final ReadWriteTable<K, V> table;
- SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Config config, TaskContext context) {
+ SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
this.sendToTableOpSpec = sendToTableOpSpec;
- this.table = (ReadWriteTable) context.getTable(sendToTableOpSpec.getTableSpec().getId());
+ this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableSpec().getId());
}
@Override
- protected void handleInit(Config config, TaskContext context) {
+ protected void handleInit(Context context) {
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 5dbe27f..6fe9006 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import java.util.Collection;
@@ -38,14 +37,14 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, Void> {
private final SinkOperatorSpec<M> sinkOpSpec;
private final SinkFunction<M> sinkFn;
- SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) {
+ SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec) {
this.sinkOpSpec = sinkOpSpec;
this.sinkFn = sinkOpSpec.getSinkFn();
}
@Override
- protected void handleInit(Config config, TaskContext context) {
- this.sinkFn.init(config, context);
+ protected void handleInit(Context context) {
+ this.sinkFn.init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 6cd426b..1a615bd 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -18,12 +18,11 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import java.util.Collection;
@@ -46,8 +45,8 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
}
@Override
- protected void handleInit(Config config, TaskContext context) {
- transformFn.init(config, context);
+ protected void handleInit(Context context) {
+ transformFn.init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 54a5770..d44241d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -18,18 +18,17 @@
*/
package org.apache.samza.operators.impl;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import java.util.Collection;
+import java.util.Collections;
+
/**
* Implementation of a stream-table join operator that first retrieve the value of
@@ -45,15 +44,14 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M
private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
private final ReadableTable<K, ?> table;
- StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec,
- Config config, TaskContext context) {
+ StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
this.joinOpSpec = joinOpSpec;
- this.table = (ReadableTable) context.getTable(joinOpSpec.getTableSpec().getId());
+ this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableSpec().getId());
}
@Override
- protected void handleInit(Config config, TaskContext context) {
- this.joinOpSpec.getJoinFn().init(config, context);
+ protected void handleInit(Context context) {
+ this.joinOpSpec.getJoinFn().init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index b175671..c09c5f8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -21,14 +21,13 @@
package org.apache.samza.operators.impl;
import com.google.common.base.Preconditions;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.impl.store.TimeSeriesKey;
import org.apache.samza.operators.impl.store.TimeSeriesStore;
import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
-import org.apache.samza.util.TimestampedValue;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.triggers.FiringType;
@@ -45,9 +44,9 @@ import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.storage.kv.ClosableIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
+import org.apache.samza.util.TimestampedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,23 +110,23 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
}
@Override
- protected void handleInit(Config config, TaskContext context) {
+ protected void handleInit(Context context) {
KeyValueStore<TimeSeriesKey<K>, Object> store =
- (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId());
+ (KeyValueStore<TimeSeriesKey<K>, Object>) context.getTaskContext().getStore(windowOpSpec.getOpId());
if (initializer != null) {
- initializer.init(config, context);
+ initializer.init(context);
}
if (keyFn != null) {
- keyFn.init(config, context);
+ keyFn.init(context);
}
// For aggregating windows, we use the store in over-write mode since we only retain the aggregated
// value. Else, we use the store in append-mode.
if (foldLeftFn != null) {
- foldLeftFn.init(config, context);
+ foldLeftFn.init(context);
timeSeriesStore = new TimeSeriesStoreImpl(store, false);
} else {
timeSeriesStore = new TimeSeriesStoreImpl(store, true);
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
index 4e640dc..c1d62f5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
@@ -20,12 +20,11 @@ package org.apache.samza.operators.spec;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
/**
@@ -50,8 +49,8 @@ class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> {
}
@Override
- public void init(Config config, TaskContext context) {
- filterFn.init(config, context);
+ public void init(Context context) {
+ filterFn.init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
index 6ce522f..d3a587a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
@@ -20,12 +20,11 @@ package org.apache.samza.operators.spec;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
/**
@@ -53,8 +52,8 @@ class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
}
@Override
- public void init(Config config, TaskContext context) {
- mapFn.init(config, context);
+ public void init(Context context) {
+ mapFn.init(context);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 26e52f2..3149989 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -35,6 +36,11 @@ import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -46,6 +52,8 @@ import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
+
/**
* StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
@@ -97,6 +105,16 @@ public class StreamProcessor {
private final JobCoordinator jobCoordinator;
private final ProcessorLifecycleListener processorListener;
private final TaskFactory taskFactory;
+ /**
+ * Type parameter needs to be {@link ApplicationContainerContext} so that we can eventually call the base methods of
+ * the context object.
+ */
+ private final Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional;
+ /**
+ * Type parameter needs to be {@link ApplicationTaskContext} so that we can eventually call the base methods of the
+ * context object.
+ */
+ private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional;
private final Map<String, MetricsReporter> customMetricsReporter;
private final Config config;
private final long taskShutdownMs;
@@ -143,57 +161,60 @@ public class StreamProcessor {
JobCoordinatorListener jobCoordinatorListener = null;
/**
- * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}.
- *
- * <p>
- * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService
- * when it receives new {@link JobModel} from JobCoordinator.
- * <p>
- *
- * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor.
+ * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except
+ * it creates a {@link JobCoordinator} instead of accepting it as an argument.
*
- * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}.
- * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics.
- * @param taskFactory the {@link TaskFactory} to be used for creating task instances.
- * @param processorListener listener to the StreamProcessor life cycle.
+ * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
*/
+ @Deprecated
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
ProcessorLifecycleListener processorListener) {
this(config, customMetricsReporters, taskFactory, processorListener, null);
}
/**
- * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener)}, except the
- * {@link JobCoordinator} is given for this {@link StreamProcessor}.
- * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
- * @param customMetricsReporters metric Reporter
- * @param taskFactory task factory to instantiate the Task
+ * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
+ * <ol>
+ * <li>Passes null for application-defined context factories</li>
+ * <li>Accepts a {@link ProcessorLifecycleListener} directly instead of a
+ * {@link StreamProcessorLifecycleListenerFactory}</li>
+ * </ol>
+ * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+ *
* @param processorListener listener to the StreamProcessor life cycle
- * @param jobCoordinator the instance of {@link JobCoordinator}
*/
+ @Deprecated
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
- this(config, customMetricsReporters, taskFactory, sp -> processorListener, jobCoordinator);
+ this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), sp -> processorListener,
+ jobCoordinator);
}
/**
- * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except
- * there is a {@link StreamProcessorLifecycleListenerFactory} as input instead of {@link ProcessorLifecycleListener}.
- * This is useful to create a {@link ProcessorLifecycleListener} with a reference to this {@link StreamProcessor}
+ * Builds a {@link StreamProcessor} with full specification of processing components.
*
* @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
- * @param customMetricsReporters metric Reporter
+ * @param customMetricsReporters registered with the metrics system to report metrics
* @param taskFactory task factory to instantiate the Task
- * @param listenerFactory listener to the StreamProcessor life cycle
+ * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
+ * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
+ * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
* @param jobCoordinator the instance of {@link JobCoordinator}
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
+ Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
+ Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) {
Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
- this.taskFactory = taskFactory;
this.config = config;
- this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
this.customMetricsReporter = customMetricsReporters;
+ this.taskFactory = taskFactory;
+ this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional;
+ this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
+ this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
@@ -283,7 +304,10 @@ public class StreamProcessor {
@VisibleForTesting
SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
- return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
+ return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
+ this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
+ Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
+ Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)));
}
private JobCoordinator createJobCoordinator() {
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7100482..a5eeba1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -167,7 +167,8 @@ public class LocalApplicationRunner implements ApplicationRunner {
// TODO: the null processorId has to be fixed after SAMZA-1835
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
reporters.put(name, factory.getMetricsReporter(name, null, config)));
- return new StreamProcessor(config, reporters, taskFactory, listenerFactory, null);
+ return new StreamProcessor(config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+ appDesc.getApplicationTaskContextFactory(), listenerFactory, null);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index add7e69..94ff1eb 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -25,8 +25,8 @@ import java.util.Random;
import org.slf4j.MDC;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorUtil;
import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.ApplicationDescriptorUtil;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -36,6 +36,7 @@ import org.apache.samza.container.ContainerHeartbeatMonitor;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainer$;
import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.JobContextImpl;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.TaskFactory;
@@ -44,6 +45,8 @@ import org.apache.samza.util.SamzaUncaughtExceptionHandler;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
+
/**
* Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
@@ -93,9 +96,11 @@ public class LocalContainerRunner {
SamzaContainer container = SamzaContainer$.MODULE$.apply(
containerId,
jobModel,
- config,
ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
- taskFactory);
+ taskFactory,
+ JobContextImpl.fromConfigWithDefaults(config),
+ Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+ Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)));
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9a76d75..be074ee 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -27,13 +27,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.ContainerContextImpl;
+import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
import org.apache.samza.job.model.ContainerModel;
@@ -209,8 +210,7 @@ public class StorageRecovery extends CommandLine {
for (ContainerModel containerModel : containers.values()) {
HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
- SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getId(), jobConfig, containerModel.getTasks()
- .keySet(), new MetricsRegistryMap());
+ ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
for (TaskModel taskModel : containerModel.getTasks().values()) {
HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
@@ -233,6 +233,7 @@ public class StorageRecovery extends CommandLine {
null,
new MetricsRegistryMap(),
changeLogSystemStreamPartition,
+ JobContextImpl.fromConfigWithDefaults(jobConfig),
containerContext);
taskStores.put(storeName, storageEngine);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index ae72414..d7b15a4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -18,21 +18,19 @@
*/
package org.apache.samza.table;
-import java.util.HashMap;
-import java.util.Map;
-
+import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
/**
@@ -97,12 +95,10 @@ public class TableManager {
/**
* Initialize table providers with container and task contexts
- * @param containerContext context for the Samza container
- * @param taskContext context for the current task, nullable for global tables
+ * @param context context for the task
*/
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- Preconditions.checkNotNull(containerContext, "null container context.");
- tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+ public void init(Context context) {
+ tableContexts.values().forEach(ctx -> ctx.tableProvider.init(context));
initialized = true;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index b7aa33c..32d2bed 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -19,25 +19,23 @@
package org.apache.samza.table.caching;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.utils.DefaultTableReadMetrics;
import org.apache.samza.table.utils.DefaultTableWriteMetrics;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
@@ -91,10 +89,10 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
- writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ readMetrics = new DefaultTableReadMetrics(context, this, tableId);
+ writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
tableMetricsUtil.newGauge("miss-rate", () -> missRate());
tableMetricsUtil.newGauge("req-count", () -> requestCount());
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index d5f7767..c959a56 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -54,13 +54,13 @@ public class CachingTableProvider extends BaseTableProvider {
@Override
public Table getTable() {
String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID);
- ReadableTable table = (ReadableTable) taskContext.getTable(realTableId);
+ ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID);
ReadWriteTable cache;
if (cacheTableId != null) {
- cache = (ReadWriteTable) taskContext.getTable(cacheTableId);
+ cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
} else {
cache = createDefaultCacheTable(realTableId);
defaultCaches.add(cache);
@@ -68,7 +68,7 @@ public class CachingTableProvider extends BaseTableProvider {
boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND));
CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
- cachingTable.init(containerContext, taskContext);
+ cachingTable.init(this.context);
return cachingTable;
}
@@ -97,7 +97,7 @@ public class CachingTableProvider extends BaseTableProvider {
readTtlMs, writeTtlMs, cacheSize));
GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
- cacheTable.init(containerContext, taskContext);
+ cacheTable.init(this.context);
return cacheTable;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index a8beb3b..5f77ee4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -19,19 +19,17 @@
package org.apache.samza.table.caching.guava;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
+import com.google.common.cache.Cache;
import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
-import com.google.common.cache.Cache;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
@@ -54,8 +52,8 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
// hit- and miss-rate are provided by CachingTable.
tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index 1513249..39f332e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -47,7 +47,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
public Table getTable() {
Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE));
GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
- table.init(containerContext, taskContext);
+ table.init(this.context);
guavaTables.add(table);
return table;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 9ef4c1b..4cbc270 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -19,21 +19,19 @@
package org.apache.samza.table.remote;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.DefaultTableWriteMetrics;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
/**
@@ -63,10 +61,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- super.init(containerContext, taskContext);
- writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ super.init(context);
+ writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index b3d82f3..9487e39 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -19,28 +19,26 @@
package org.apache.samza.table.remote;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.utils.DefaultTableReadMetrics;
import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.function.Function;
/**
@@ -110,9 +108,9 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+ public void init(Context context) {
+ readMetrics = new DefaultTableReadMetrics(context, this, tableId);
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index cae0bbd..9415e70 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -19,14 +19,6 @@
package org.apache.samza.table.remote;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.RetriableReadFunction;
@@ -37,6 +29,14 @@ import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.table.utils.TableMetricsUtil;
import org.apache.samza.util.RateLimiter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
@@ -83,7 +83,7 @@ public class RemoteTableProvider extends BaseTableProvider {
TableReadFunction readFn = getReadFn();
RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
if (rateLimiter != null) {
- rateLimiter.init(containerContext.config, taskContext);
+ rateLimiter.init(this.context);
}
TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RL_READ_TAG);
@@ -150,7 +150,7 @@ public class RemoteTableProvider extends BaseTableProvider {
writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
}
- TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+ TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
if (readRetryPolicy != null) {
((RetriableReadFunction) readFn).setMetrics(metricsUtil);
}
@@ -158,7 +158,7 @@ public class RemoteTableProvider extends BaseTableProvider {
((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
}
- table.init(containerContext, taskContext);
+ table.init(this.context);
tables.add(table);
return table;
}
@@ -184,7 +184,7 @@ public class RemoteTableProvider extends BaseTableProvider {
private TableReadFunction<?, ?> getReadFn() {
TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
if (readFn != null) {
- readFn.init(containerContext.config, taskContext);
+ readFn.init(this.context);
}
return readFn;
}
@@ -192,7 +192,7 @@ public class RemoteTableProvider extends BaseTableProvider {
private TableWriteFunction<?, ?> getWriteFn() {
TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
if (writeFn != null) {
- writeFn.init(containerContext.config, taskContext);
+ writeFn.init(this.context);
}
return writeFn;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
index 960e2a4..dfbd835 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
@@ -22,10 +22,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +38,7 @@ abstract public class BaseTableProvider implements TableProvider {
final protected TableSpec tableSpec;
- protected SamzaContainerContext containerContext;
- protected TaskContext taskContext;
+ protected Context context;
public BaseTableProvider(TableSpec tableSpec) {
this.tableSpec = tableSpec;
@@ -50,9 +48,8 @@ abstract public class BaseTableProvider implements TableProvider {
* {@inheritDoc}
*/
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
- this.containerContext = containerContext;
- this.taskContext = taskContext;
+ public void init(Context context) {
+ this.context = context;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
index 2acd082..090c8c1 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
@@ -18,11 +18,10 @@
*/
package org.apache.samza.table.utils;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
/**
@@ -39,14 +38,12 @@ public class DefaultTableReadMetrics {
/**
* Constructor based on container and task container context
*
- * @param containerContext container context
- * @param taskContext task context
+ * @param context {@link Context} for this task
* @param table underlying table
* @param tableId table Id
*/
- public DefaultTableReadMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
- Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+ public DefaultTableReadMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
getNs = tableMetricsUtil.newTimer("get-ns");
getAllNs = tableMetricsUtil.newTimer("getAll-ns");
numGets = tableMetricsUtil.newCounter("num-gets");
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
index a32d6d5..69d4ef2 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
@@ -18,11 +18,10 @@
*/
package org.apache.samza.table.utils;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
public class DefaultTableWriteMetrics {
@@ -43,14 +42,12 @@ public class DefaultTableWriteMetrics {
/**
* Utility class that contains the default set of write metrics.
*
- * @param containerContext container context
- * @param taskContext task context
+ * @param context {@link Context} for this task
* @param table underlying table
* @param tableId table Id
*/
- public DefaultTableWriteMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
- Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+ public DefaultTableWriteMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
putNs = tableMetricsUtil.newTimer("put-ns");
putAllNs = tableMetricsUtil.newTimer("putAll-ns");
deleteNs = tableMetricsUtil.newTimer("delete-ns");
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 6805c64..1b19272 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -19,18 +19,16 @@
package org.apache.samza.table.utils;
-import java.util.function.Supplier;
-
import com.google.common.base.Preconditions;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
import org.apache.samza.table.caching.SupplierGauge;
-import org.apache.samza.task.TaskContext;
+
+import java.util.function.Supplier;
/**
@@ -46,21 +44,16 @@ public class TableMetricsUtil {
/**
* Constructor based on container context
*
- * @param containerContext container context
- * @param taskContext task context
+ * @param context {@link Context} for this task
* @param table underlying table
* @param tableId table Id
*/
- public TableMetricsUtil(SamzaContainerContext containerContext, TaskContext taskContext,
- Table table, String tableId) {
-
- Preconditions.checkNotNull(containerContext);
+ public TableMetricsUtil(Context context, Table table, String tableId) {
+ Preconditions.checkNotNull(context);
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(tableId);
- this.metricsRegistry = taskContext == null // The table is at container level, when the task
- ? containerContext.metricsRegistry // context passed in is null
- : taskContext.getMetricsRegistry();
+ this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry();
this.groupName = table.getClass().getSimpleName();
this.tableId = tableId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 111869c..6c255f1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -33,16 +33,15 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-
import org.apache.samza.SamzaException;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
-import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
import org.slf4j.Logger;
@@ -374,7 +373,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
}, commitMs, commitMs, TimeUnit.MILLISECONDS);
}
- final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler();
+ final EpochTimeScheduler epochTimeScheduler = task.epochTimeScheduler();
if (epochTimeScheduler != null) {
epochTimeScheduler.registerListener(() -> {
state.needScheduler();
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index e2fea95..fcd9766 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -20,7 +20,7 @@
package org.apache.samza.task;
import java.util.concurrent.ExecutorService;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -40,9 +40,9 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi
}
@Override
- public void init(Config config, TaskContext context) throws Exception {
+ public void init(Context context) throws Exception {
if (wrappedTask instanceof InitableTask) {
- ((InitableTask) wrappedTask).init(config, context);
+ ((InitableTask) wrappedTask).init(context);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index aa896c2..218ba5d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,14 +18,13 @@
*/
package org.apache.samza.task;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.system.EndOfStreamMessage;
-import org.apache.samza.system.MessageType;
-import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.util.Clock;
@@ -42,8 +41,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
private final OperatorSpecGraph specGraph;
- // TODO: to be replaced by proper scope of shared context factory in SAMZA-1714
- private final ContextManager contextManager;
private final Clock clock;
private OperatorImplGraph operatorImplGraph;
@@ -52,17 +49,15 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
* Constructs an adaptor task to run the user-implemented {@link OperatorSpecGraph}.
* @param specGraph the serialized version of user-implemented {@link OperatorSpecGraph}
* that includes the logical DAG
- * @param contextManager the {@link ContextManager} used to set up the shared context used by operators in the DAG
* @param clock the {@link Clock} to use for time-keeping
*/
- public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager, Clock clock) {
+ public StreamOperatorTask(OperatorSpecGraph specGraph, Clock clock) {
this.specGraph = specGraph.clone();
- this.contextManager = contextManager;
this.clock = clock;
}
- public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager) {
- this(specGraph, contextManager, SystemClock.instance());
+ public StreamOperatorTask(OperatorSpecGraph specGraph) {
+ this(specGraph, SystemClock.instance());
}
/**
@@ -75,20 +70,13 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
* an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph}
* corresponding to the logical DAG.
*
- * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
* @param context allows initializing and accessing contextual data of this StreamTask
* @throws Exception in case of initialization errors
*/
@Override
- public final void init(Config config, TaskContext context) throws Exception {
-
- // get the user-implemented per task context manager and initialize it
- if (this.contextManager != null) {
- this.contextManager.init(config, context);
- }
-
+ public final void init(Context context) throws Exception {
// create the operator impl DAG corresponding to the logical operator spec DAG
- this.operatorImplGraph = new OperatorImplGraph(specGraph, config, context, clock);
+ this.operatorImplGraph = new OperatorImplGraph(specGraph, context, clock);
}
/**
@@ -133,9 +121,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
@Override
public void close() throws Exception {
- if (this.contextManager != null) {
- this.contextManager.close();
- }
if (operatorImplGraph != null) {
operatorImplGraph.close();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 834777b..c312fac 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -48,8 +48,8 @@ public class TaskFactoryUtil {
if (appDesc instanceof TaskApplicationDescriptorImpl) {
return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory();
} else if (appDesc instanceof StreamApplicationDescriptorImpl) {
- return (StreamTaskFactory) () -> new StreamOperatorTask(((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph(),
- ((StreamApplicationDescriptorImpl) appDesc).getContextManager());
+ return (StreamTaskFactory) () -> new StreamOperatorTask(
+ ((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph());
}
throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
+ "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName()));
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
index 1cf9a9c..a91d663 100644
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
@@ -18,21 +18,20 @@
*/
package org.apache.samza.util;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -106,16 +105,15 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
}
@Override
- public void init(Config config, TaskContext taskContext) {
+ public void init(Context context) {
this.tagToRateLimiterMap = Collections.unmodifiableMap(tagToTargetRateMap.entrySet().stream()
.map(e -> {
String tag = e.getKey();
- int effectiveRate = e.getValue();
- if (taskContext != null) {
- effectiveRate /= taskContext.getSamzaContainerContext().taskNames.size();
- LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d",
- taskContext.getTaskName(), tag, effectiveRate));
- }
+ int numTasksInContainer = context.getContainerContext().getContainerModel().getTasks().keySet().size();
+ int effectiveRate = e.getValue() / numTasksInContainer;
+ TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+ LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d", taskName, tag,
+ effectiveRate));
return new ImmutablePair<>(tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
})
.collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue))
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3c10aae..3292986 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -43,6 +43,7 @@ import org.apache.samza.config._
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
+import org.apache.samza.context._
import org.apache.samza.job.model.{ContainerModel, JobModel}
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
import org.apache.samza.serializers._
@@ -122,9 +123,13 @@ object SamzaContainer extends Logging {
def apply(
containerId: String,
jobModel: JobModel,
- config: Config,
customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](),
- taskFactory: TaskFactory[_]) = {
+ taskFactory: TaskFactory[_],
+ jobContext: JobContext,
+ applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
+ applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
+ ) = {
+ val config = jobContext.getConfig
val containerModel = jobModel.getContainers.get(containerId)
val containerName = "samza-container-%s" format containerId
val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
@@ -488,8 +493,10 @@ object SamzaContainer extends Logging {
.asScala
.map(_.getTaskName)
.toSet
- val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
+ val containerContext = new ContainerContextImpl(containerModel, samzaContainerMetrics.registry)
+ val applicationContainerContextOption = applicationContainerContextFactoryOption
+ .map(_.create(jobContext, containerContext))
val storeWatchPaths = new util.HashSet[Path]()
@@ -571,6 +578,7 @@ object SamzaContainer extends Logging {
collector,
taskInstanceMetrics.registry,
changeLogSystemStreamPartition,
+ jobContext,
containerContext)
(storeName, storageEngine)
}
@@ -635,13 +643,11 @@ object SamzaContainer extends Logging {
def createTaskInstance(task: Any): TaskInstance = new TaskInstance(
task = task,
- taskName = taskName,
- config = config,
+ taskModel = taskModel,
metrics = taskInstanceMetrics,
systemAdmins = systemAdmins,
consumerMultiplexer = consumerMultiplexer,
collector = collector,
- containerContext = containerContext,
offsetManager = offsetManager,
storageManager = storageManager,
tableManager = tableManager,
@@ -652,7 +658,11 @@ object SamzaContainer extends Logging {
streamMetadataCache = streamMetadataCache,
timerExecutor = timerExecutor,
sideInputSSPs = taskSideInputSSPs,
- sideInputStorageManager = sideInputStorageManager)
+ sideInputStorageManager = sideInputStorageManager,
+ jobContext = jobContext,
+ containerContext = containerContext,
+ applicationContainerContextOption = applicationContainerContextOption,
+ applicationTaskContextFactoryOption = applicationTaskContextFactoryOption)
val taskInstance = createTaskInstance(task)
@@ -708,7 +718,7 @@ object SamzaContainer extends Logging {
info("Samza container setup complete.")
new SamzaContainer(
- containerContext = containerContext,
+ config = config,
taskInstances = taskInstances,
runLoop = runLoop,
systemAdmins = systemAdmins,
@@ -722,10 +732,11 @@ object SamzaContainer extends Logging {
diskSpaceMonitor = diskSpaceMonitor,
hostStatisticsMonitor = memoryStatisticsMonitor,
taskThreadPool = taskThreadPool,
- timerExecutor = timerExecutor)
+ timerExecutor = timerExecutor,
+ containerContext = containerContext,
+ applicationContainerContextOption = applicationContainerContextOption)
}
-
/**
* Builds the set of SSPs for all changelogs on this container.
*/
@@ -741,7 +752,7 @@ object SamzaContainer extends Logging {
}
class SamzaContainer(
- containerContext: SamzaContainerContext,
+ config: Config,
taskInstances: Map[TaskName, TaskInstance],
runLoop: Runnable,
systemAdmins: SystemAdmins,
@@ -756,12 +767,14 @@ class SamzaContainer(
reporters: Map[String, MetricsReporter] = Map(),
jvm: JvmMetrics = null,
taskThreadPool: ExecutorService = null,
- timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor) extends Runnable with Logging {
+ timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor,
+ containerContext: ContainerContext,
+ applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging {
- val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+ val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
var shutdownHookThread: Thread = null
var jmxServer: JmxServer = null
- val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled
+ val isAutoCommitEnabled = config.isAutoCommitEnabled
@volatile private var status = SamzaContainerStatus.NOT_STARTED
private var exceptionSeen: Throwable = null
@@ -789,6 +802,7 @@ class SamzaContainer(
status = SamzaContainerStatus.STARTING
jmxServer = new JmxServer()
+ applicationContainerContextOption.foreach(_.start)
startMetrics
startDiagnostics
@@ -841,6 +855,8 @@ class SamzaContainer(
shutdownSecurityManger
shutdownAdmins
+ applicationContainerContextOption.foreach(_.stop)
+
if (!status.equals(SamzaContainerStatus.FAILED)) {
status = SamzaContainerStatus.STOPPED
}
@@ -930,18 +946,18 @@ class SamzaContainer(
}
def startDiagnostics {
- if (containerContext.config.getDiagnosticsEnabled) {
+ if (config.getDiagnosticsEnabled) {
info("Starting diagnostics.")
try {
- val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass).
+ val diagnosticsAppender = Class.forName(config.getDiagnosticsAppenderClass).
getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
}
catch {
case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
error("Failed to instantiate diagnostic appender", e)
throw new ConfigException("Failed to instantiate diagnostic appender class " +
- containerContext.config.getDiagnosticsAppenderClass, e)
+ config.getDiagnosticsAppenderClass, e)
}
}
}
@@ -958,24 +974,25 @@ class SamzaContainer(
}
def storeContainerLocality {
- val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled
+ val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled
if (isHostAffinityEnabled) {
- val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry)
- val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
+ val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry)
+ val containerId = containerContext.getContainerModel.getId
+ val containerName = "SamzaContainer-" + containerId
info("Registering %s with metadata store" format containerName)
try {
val hostInet = Util.getLocalHost
val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
info("Writing container locality and JMX address to metadata store")
- localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName)
+ localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName)
} catch {
case uhe: UnknownHostException =>
warn("Received UnknownHostException when persisting locality info for container %s: " +
- "%s" format (containerContext.id, uhe.getMessage)) //No-op
+ "%s" format (containerId, uhe.getMessage)) //No-op
case unknownException: Throwable =>
warn("Received an exception when persisting locality info for container %s: " +
- "%s" format (containerContext.id, unknownException.getMessage))
+ "%s" format (containerId, unknownException.getMessage))
} finally {
info("Shutting down locality manager.")
localityManager.close()
@@ -1016,7 +1033,6 @@ class SamzaContainer(
systemAdmins.start
}
-
def startProducers {
info("Registering task instances with producers.")
@@ -1092,7 +1108,6 @@ class SamzaContainer(
systemAdmins.stop
}
-
def shutdownProducers {
info("Shutting down producer multiplexer.")
@@ -1185,4 +1200,4 @@ class SamzaContainer(
hostStatisticsMonitor.stop()
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 9f4fd17..f8e9c63 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,15 +20,17 @@
package org.apache.samza.container
+import java.util.Optional
import java.util.concurrent.ScheduledExecutorService
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.context._
+import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.scheduler.ScheduledCallback
+import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
import org.apache.samza.system._
@@ -36,19 +38,17 @@ import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.{Logging, ScalaJavaUtil}
-import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.Map
class TaskInstance(
val task: Any,
- val taskName: TaskName,
- config: Config,
+ taskModel: TaskModel,
val metrics: TaskInstanceMetrics,
systemAdmins: SystemAdmins,
consumerMultiplexer: SystemConsumers,
collector: TaskInstanceCollector,
- containerContext: SamzaContainerContext,
val offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
tableManager: TableManager = null,
@@ -59,15 +59,23 @@ class TaskInstance(
streamMetadataCache: StreamMetadataCache = null,
timerExecutor : ScheduledExecutorService = null,
sideInputSSPs: Set[SystemStreamPartition] = Set(),
- sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging {
-
+ sideInputStorageManager: TaskSideInputStorageManager = null,
+ jobContext: JobContext,
+ containerContext: ContainerContext,
+ applicationContainerContextOption: Option[ApplicationContainerContext],
+ applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
+) extends Logging {
+
+ val taskName: TaskName = taskModel.getTaskName
val isInitableTask = task.isInstanceOf[InitableTask]
val isWindowableTask = task.isInstanceOf[WindowableTask]
val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
val isClosableTask = task.isInstanceOf[ClosableTask]
val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
- val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
+ val epochTimeScheduler: EpochTimeScheduler = EpochTimeScheduler.create(timerExecutor)
+
+ private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
(storeName: String) => {
if (storageManager != null && storageManager.getStore(storeName).isDefined) {
storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
@@ -77,9 +85,14 @@ class TaskInstance(
null
}
})
-
- val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
- kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor)
+ private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
+ new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
+ // need separate field for this instead of using it through Context, since Context throws an exception if it is null
+ private val applicationTaskContextOption = applicationTaskContextFactoryOption.map(_.create(jobContext,
+ containerContext, taskContext, applicationContainerContextOption.orNull))
+ val context = new ContextImpl(jobContext, containerContext, taskContext,
+ Optional.ofNullable(applicationContainerContextOption.orNull),
+ Optional.ofNullable(applicationTaskContextOption.orNull))
// store the (ssp -> if this ssp has caught up) mapping. "caught up"
// means the same ssp in other taskInstances have the same offset as
@@ -88,6 +101,8 @@ class TaskInstance(
scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
+ private val config: Config = jobContext.getConfig
+
val intermediateStreams: Set[String] = config.getStreamIds.filter(config.getIsIntermediateStream).toSet
val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
@@ -126,7 +141,7 @@ class TaskInstance(
if (tableManager != null) {
debug("Starting table manager for taskName: %s" format taskName)
- tableManager.init(containerContext, context)
+ tableManager.init(context)
} else {
debug("Skipping table manager initialization for taskName: %s" format taskName)
}
@@ -136,10 +151,14 @@ class TaskInstance(
if (isInitableTask) {
debug("Initializing task for taskName: %s" format taskName)
- task.asInstanceOf[InitableTask].init(config, context)
+ task.asInstanceOf[InitableTask].init(context)
} else {
debug("Skipping task initialization for taskName: %s" format taskName)
}
+ applicationTaskContextOption.foreach(applicationTaskContext => {
+ debug("Starting application-defined task context for taskName: %s" format taskName)
+ applicationTaskContext.start()
+ })
}
def registerProducers {
@@ -226,7 +245,7 @@ class TaskInstance(
trace("Scheduler for taskName: %s" format taskName)
exceptionHandler.maybeHandle {
- context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry =>
+ epochTimeScheduler.removeReadyTimers().entrySet().foreach { entry =>
entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator)
}
}
@@ -266,6 +285,10 @@ class TaskInstance(
}
def shutdownTask {
+ applicationTaskContextOption.foreach(applicationTaskContext => {
+ debug("Stopping application-defined task context for taskName: %s" format taskName)
+ applicationTaskContext.stop()
+ })
if (task.isInstanceOf[ClosableTask]) {
debug("Shutting down stream task for taskName: %s" format taskName)
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index bec4ec0..929d6a4 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.{Config, TaskConfigJava}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
+import org.apache.samza.context.JobContextImpl
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.job.{StreamJob, StreamJobFactory}
@@ -112,9 +113,12 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val container = SamzaContainer(
containerId,
jobModel,
- config,
Map[String, MetricsReporter](),
- taskFactory)
+ taskFactory,
+ JobContextImpl.fromConfigWithDefaults(config),
+ Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
+ Option(appDesc.getApplicationTaskContextFactory.orElse(null))
+ )
container.setContainerListener(containerListener)
val threadJob = new ThreadJob(container)