You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/22 16:36:36 UTC
[samza] branch master updated: SAMZA-2172: Make High Level
applications respect job.container.thread.pool.size
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c087f80 SAMZA-2172: Make High Level applications respect job.container.thread.pool.size
c087f80 is described below
commit c087f80cfe2ae576f30585894a5d907f7c5b2cee
Author: mynameborat <bh...@gmail.com>
AuthorDate: Wed May 22 09:36:28 2019 -0700
SAMZA-2172: Make High Level applications respect job.container.thread.pool.size
Refer https://issues.apache.org/jira/browse/SAMZA-2172 for more details.
`2019-04-25 14:41:17.132 [Samza Container Thread-2] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-2
2019-04-25 14:41:17.133 [Samza Container Thread-2] WikipediaAsyncApplication [INFO] Finished executing blocking filter on the thread Samza Container Thread-2
2019-04-25 14:41:17.442 [Samza Container Thread-3] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-3
2019-04-25 14:41:17.442 [Samza Container Thread-3] WikipediaAsyncApplication [INFO] Finished executing blocking filter on the thread Samza Container Thread-3
2019-04-25 14:41:17.487 [Samza Container Thread-0] WikipediaAsyncApplication [INFO] Executing blocking filter on the thread Samza Container Thread-0
`
Author: mynameborat <bh...@gmail.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #1011 from mynameborat/async-api-fix
---
.../org/apache/samza/task/StreamOperatorTask.java | 99 ++++++++++++++--------
.../samza/task/StreamOperatorTaskFactory.java | 30 +++++++
.../org/apache/samza/task/TaskFactoryUtil.java | 28 +++++-
.../apache/samza/container/SamzaContainer.scala | 1 -
.../apache/samza/task/TestStreamOperatorTask.java | 34 +++++++-
.../org/apache/samza/task/TestTaskFactoryUtil.java | 21 +++--
6 files changed, 166 insertions(+), 47 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 387ad2e..a0ec7b8 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,6 +21,7 @@ package org.apache.samza.task;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.operators.OperatorSpecGraph;
@@ -47,6 +48,14 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
private final OperatorSpecGraph specGraph;
private final Clock clock;
+ /*
+ * Thread pool used by the task to schedule processing of incoming messages. If job.container.thread.pool.size is
+ * not configured, this will be null. We don't want to create an executor service within StreamOperatorTask due to
+ * following reasons
+ * 1. It is harder to reason about the lifecycle of the executor service
+ * 2. We end up with thread pool proliferation. Especially for jobs with high number of tasks.
+ */
+ private ExecutorService taskThreadPool;
private OperatorImplGraph operatorImplGraph;
/**
@@ -86,7 +95,9 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
/**
* Passes the incoming message envelopes along to the {@link InputOperatorImpl} node
- * for the input {@link SystemStream}.
+ * for the input {@link SystemStream}. It is non-blocking and dispatches the message to the container thread
+ * pool. The thread pool size is configured through job.container.thread.pool.size. In the absence of the config,
+ * the task executes the DAG on the run loop thread.
* <p>
* From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
* its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
@@ -99,40 +110,55 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
@Override
public final void processAsync(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator,
TaskCallback callback) {
- SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
- InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
- if (inputOpImpl != null) {
- CompletionStage<Void> processFuture;
- MessageType messageType = MessageType.of(ime.getMessage());
- switch (messageType) {
- case USER_MESSAGE:
- processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
- break;
-
- case END_OF_STREAM:
- EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
- processFuture =
- inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
- break;
-
- case WATERMARK:
- WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
- processFuture =
- inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
- break;
-
- default:
- processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
- break;
- }
-
- processFuture.whenComplete((val, ex) -> {
- if (ex != null) {
- callback.failure(ex);
- } else {
- callback.complete();
+ Runnable processRunnable = () -> {
+ try {
+ SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
+ InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
+ if (inputOpImpl != null) {
+ CompletionStage<Void> processFuture;
+ MessageType messageType = MessageType.of(ime.getMessage());
+ switch (messageType) {
+ case USER_MESSAGE:
+ processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
+ break;
+
+ case END_OF_STREAM:
+ EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
+ processFuture =
+ inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
+ break;
+
+ case WATERMARK:
+ WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
+ processFuture = inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector,
+ coordinator);
+ break;
+
+ default:
+ processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
+ break;
}
- });
+
+ processFuture.whenComplete((val, ex) -> {
+ if (ex != null) {
+ callback.failure(ex);
+ } else {
+ callback.complete();
+ }
+ });
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to process the incoming message due to ", e);
+ callback.failure(e);
+ }
+ };
+
+ if (taskThreadPool != null) {
+ LOG.debug("Processing message using thread pool.");
+ taskThreadPool.submit(processRunnable);
+ } else {
+ LOG.debug("Processing message on the run loop thread.");
+ processRunnable.run();
}
}
@@ -153,6 +179,11 @@ public class StreamOperatorTask implements AsyncStreamTask, InitableTask, Window
}
}
+ /* package private setter for TaskFactoryUtil to initialize the taskThreadPool */
+ void setTaskThreadPool(ExecutorService taskThreadPool) {
+ this.taskThreadPool = taskThreadPool;
+ }
+
/* package private for testing */
OperatorImplGraph getOperatorImplGraph() {
return this.operatorImplGraph;
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java
new file mode 100644
index 0000000..0f9cc3b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTaskFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+
+/**
+ * Build {@link StreamOperatorTask} instances.
+ * <p>
+ * Implementations should return a new instance of {@link StreamOperatorTask} for each {@link #createInstance()} invocation.
+ * Note: It is not part of samza-api as it is a temporary hack introduced for SAMZA-2172. It will eventually
+ * go away with SAMZA-2203
+ */
+interface StreamOperatorTaskFactory extends TaskFactory<AsyncStreamTask> {
+}
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 649483d..85549e4 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -48,7 +48,7 @@ public class TaskFactoryUtil {
if (appDesc instanceof TaskApplicationDescriptorImpl) {
return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory();
} else if (appDesc instanceof StreamApplicationDescriptorImpl) {
- return (AsyncStreamTaskFactory) () -> new StreamOperatorTask(
+ return (StreamOperatorTaskFactory) () -> new StreamOperatorTask(
((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph());
}
throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
@@ -112,6 +112,24 @@ public class TaskFactoryUtil {
return factory;
}
+ boolean isStreamOperatorTaskClass = factory instanceof StreamOperatorTaskFactory;
+
+ /*
+ * Note: Even though StreamOperatorTask is an instanceof AsyncStreamTask, we still need to
+ * adapt it in order to inject the container thread pool. Long term, this will go away once we have the
+ * InternalTaskContext refactor, which would then become the entry point for exposing any of the runtime objects
+ * created in the container.
+ * Refer to SAMZA-2203 for more details.
+ */
+ if (isStreamOperatorTaskClass) {
+ log.info("Adapting StreamOperatorTaskFactory to inject container thread pool");
+ return (AsyncStreamTaskFactory) () -> {
+ StreamOperatorTask operatorTask = (StreamOperatorTask) factory.createInstance();
+ operatorTask.setTaskThreadPool(taskThreadPool);
+ return operatorTask;
+ };
+ }
+
log.info("Converting StreamTask to AsyncStreamTaskAdapter");
return (AsyncStreamTaskFactory) () ->
new AsyncStreamTaskAdapter(((StreamTaskFactory) factory).createInstance(), taskThreadPool);
@@ -122,8 +140,12 @@ public class TaskFactoryUtil {
throw new SamzaException("Either the task class name or the task factory instance is required.");
}
- if (!(factory instanceof StreamTaskFactory) && !(factory instanceof AsyncStreamTaskFactory)) {
- throw new SamzaException(String.format("TaskFactory must be either StreamTaskFactory or AsyncStreamTaskFactory. %s is not supported",
+ boolean isValidFactory = factory instanceof StreamTaskFactory
+ || factory instanceof AsyncStreamTaskFactory
+ || factory instanceof StreamOperatorTaskFactory;
+
+ if (!isValidFactory) {
+ throw new SamzaException(String.format("TaskFactory must be either StreamTaskFactory or AsyncStreamTaskFactory or StreamOperatorTaskFactory. %s is not supported",
factory.getClass()));
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1b356f6..068d494 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -43,7 +43,6 @@ import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor
import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
import org.apache.samza.context._
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
-import org.apache.samza.metadatastore.MetadataStoreFactory
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
import org.apache.samza.serializers._
import org.apache.samza.serializers.model.SamzaObjectMapper
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index ab5e295..330cab9 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -19,21 +19,22 @@
package org.apache.samza.task;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.samza.context.Context;
import org.apache.samza.context.JobContext;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.util.Clock;
import org.junit.Test;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestStreamOperatorTask {
-
public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) {
return task.getOperatorImplGraph();
}
@@ -54,4 +55,29 @@ public class TestStreamOperatorTask {
}
operatorTask.close();
}
+
+ /**
+ * Pass an invalid IME to processAsync. Any exceptions in processAsync should still get propagated through the
+ * task callback.
+ */
+ @Test
+ public void testExceptionsInProcessInvokesTaskCallback() throws InterruptedException {
+ ExecutorService taskThreadPool = Executors.newFixedThreadPool(2);
+ TaskCallback mockTaskCallback = mock(TaskCallback.class);
+ MessageCollector mockMessageCollector = mock(MessageCollector.class);
+ TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+ StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class));
+ operatorTask.setTaskThreadPool(taskThreadPool);
+
+ CountDownLatch failureLatch = new CountDownLatch(1);
+
+ doAnswer(ctx -> {
+ failureLatch.countDown();
+ return null;
+ }).when(mockTaskCallback).failure(anyObject());
+
+ operatorTask.processAsync(mock(IncomingMessageEnvelope.class), mockMessageCollector,
+ mockTaskCoordinator, mockTaskCallback);
+ failureLatch.await();
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
index aa9a013..5db575d 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
@@ -31,9 +31,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
/**
@@ -95,6 +93,19 @@ public class TestTaskFactoryUtil {
assertEquals(retFactory, mockAsyncStreamFactory);
}
+ @Test
+ public void testFinalizeTaskFactoryForStreamOperatorTask() {
+ TaskFactory mockFactory = mock(StreamOperatorTaskFactory.class);
+ StreamOperatorTask mockStreamOperatorTask = mock(StreamOperatorTask.class);
+ when(mockFactory.createInstance())
+ .thenReturn(mockStreamOperatorTask);
+
+ ExecutorService mockThreadPool = mock(ExecutorService.class);
+ TaskFactory finalizedFactory = TaskFactoryUtil.finalizeTaskFactory(mockFactory, mockThreadPool);
+ finalizedFactory.createInstance();
+ verify(mockStreamOperatorTask, times(1)).setTaskThreadPool(eq(mockThreadPool));
+ }
+
// test getTaskFactory with StreamApplicationDescriptor
@Test
public void testGetTaskFactoryWithStreamAppDescriptor() {
@@ -102,8 +113,8 @@ public class TestTaskFactoryUtil {
OperatorSpecGraph mockSpecGraph = mock(OperatorSpecGraph.class);
when(mockStreamApp.getOperatorSpecGraph()).thenReturn(mockSpecGraph);
TaskFactory streamTaskFactory = TaskFactoryUtil.getTaskFactory(mockStreamApp);
- assertTrue(streamTaskFactory instanceof AsyncStreamTaskFactory);
- AsyncStreamTask streamTask = ((AsyncStreamTaskFactory) streamTaskFactory).createInstance();
+ assertTrue(streamTaskFactory instanceof StreamOperatorTaskFactory);
+ AsyncStreamTask streamTask = ((StreamOperatorTaskFactory) streamTaskFactory).createInstance();
assertTrue(streamTask instanceof StreamOperatorTask);
verify(mockSpecGraph).clone();
}