You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/28 22:16:43 UTC
[samza] branch master updated: SAMZA-2211: [CLEANUP] Move RunLoop
to container package
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 21608cb SAMZA-2211: [CLEANUP] Move RunLoop to container package
21608cb is described below
commit 21608cb8c2d115a32fd94adaaea2af7d69d3c689
Author: mynameborat <bh...@gmail.com>
AuthorDate: Tue May 28 15:16:37 2019 -0700
SAMZA-2211: [CLEANUP] Move RunLoop to container package
prateekm xinyuiscool please take a look
Author: mynameborat <bh...@gmail.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #1046 from mynameborat/move-run-loop
---
.../apache/samza/{task => container}/RunLoop.java | 37 ++++++++++++++--------
.../org/apache/samza/container/RunLoopFactory.java | 1 -
.../samza/container/SamzaContainerListener.java | 16 +++++-----
.../{ => container}/SamzaContainerStatus.java | 6 ++--
.../samza/scheduler/CallbackSchedulerImpl.java | 3 --
.../{task => scheduler}/EpochTimeScheduler.java | 5 ++-
.../org/apache/samza/task/TaskCallbackImpl.java | 18 ++++++++++-
.../apache/samza/task/TaskCallbackListener.java | 2 +-
.../org/apache/samza/task/TaskCallbackManager.java | 2 +-
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../org/apache/samza/container/TaskInstance.scala | 2 +-
.../samza/{task => container}/TestRunLoop.java | 23 ++++++++------
.../samza/processor/TestStreamProcessor.java | 4 +--
.../samza/scheduler/TestCallbackSchedulerImpl.java | 1 -
.../TestEpochTimeScheduler.java | 4 ++-
.../samza/container/TestSamzaContainer.scala | 2 +-
.../samza/processor/StreamProcessorTestUtils.scala | 2 +-
17 files changed, 78 insertions(+), 52 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
similarity index 95%
rename from samza-core/src/main/java/org/apache/samza/task/RunLoop.java
rename to samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 004c31d..a509a27 100644
--- a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.task;
+package org.apache.samza.container;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -34,13 +34,18 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
-import org.apache.samza.container.TaskInstanceMetrics;
-import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.CoordinatorRequests;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.apache.samza.task.TaskCallbackImpl;
+import org.apache.samza.task.TaskCallbackListener;
+import org.apache.samza.task.TaskCallbackManager;
+import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
@@ -50,7 +55,13 @@ import scala.collection.JavaConverters;
/**
- * The RunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s.
+ * The run loop supports both single-threaded and multi-threaded execution models.
+ * <p>
+ * If job.container.thread.pool.size > 1 (multi-threaded), operations like commit, window and timer for all tasks within a container
+ * happens on a thread pool.
+ * If job.container.thread.pool.size < 1 (single-threaded), operations for all tasks are multiplexed onto one execution thread.
+ * </p>.
+ * Note: In both models, process/processAsync for all tasks is invoked on the run loop thread.
*/
public class RunLoop implements Runnable, Throttleable {
private static final Logger log = LoggerFactory.getLogger(RunLoop.class);
@@ -340,7 +351,7 @@ public class RunLoop implements Runnable, Throttleable {
}
/**
- * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it
+ * The AsyncTaskWorker encapsulates the states of an {@link org.apache.samza.task.AsyncStreamTask}. If the task becomes ready, it
* will run the task asynchronously. It runs window and commit in the provided thread pool.
*/
private class AsyncTaskWorker implements TaskCallbackListener {
@@ -596,7 +607,7 @@ public class RunLoop implements Runnable, Throttleable {
*/
@Override
public void onComplete(final TaskCallback callback) {
- long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
+ long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).getTimeCreatedNs();
callbackExecutor.schedule(new Runnable() {
@Override
public void run() {
@@ -604,20 +615,20 @@ public class RunLoop implements Runnable, Throttleable {
state.doneProcess();
state.taskMetrics.asyncCallbackCompleted().inc();
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
- containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
+ containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.getTimeCreatedNs());
log.trace("Got callback complete for task {}, ssp {}",
- callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+ callbackImpl.getTaskName(), callbackImpl.getEnvelope().getSystemStreamPartition());
List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callbackImpl);
for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
- IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
+ IncomingMessageEnvelope envelope = callbackToUpdate.getEnvelope();
log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
// update offset
task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
// update coordinator
- coordinatorRequests.update(callbackToUpdate.coordinator);
+ coordinatorRequests.update(callbackToUpdate.getCoordinator());
}
} catch (Throwable t) {
log.error("Error marking process as complete.", t);
@@ -641,7 +652,7 @@ public class RunLoop implements Runnable, Throttleable {
abort(t);
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
- log.error("Got callback failure for task {}", callbackImpl.taskName, t);
+ log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
} catch (Throwable e) {
log.error("Error marking process as failed.", e);
} finally {
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index d1280fa..b50a270 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -22,7 +22,6 @@ package org.apache.samza.container;
import org.apache.samza.SamzaException;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.system.SystemConsumers;
-import org.apache.samza.task.RunLoop;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index 87321b7..db532c5 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -24,22 +24,22 @@ package org.apache.samza.container;
public interface SamzaContainerListener {
/**
- * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED}
- * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence.
+ * Method invoked when the {@link SamzaContainer} state is {@link SamzaContainerStatus#NOT_STARTED}
+ * and is about to transition to {@link SamzaContainerStatus#STARTING} to start the initialization sequence.
*/
void beforeStart();
/**
* Method invoked after the {@link SamzaContainer} has successfully transitioned to
- * the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
- * {@link org.apache.samza.task.RunLoop}
+ * the {@link SamzaContainerStatus#STARTED} state and is about to start the
+ * {@link RunLoop}
*/
void afterStart();
/**
* Method invoked after the {@link SamzaContainer} has successfully transitioned to
- * {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
- * {@link org.apache.samza.SamzaContainerStatus}
+ * {@link SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
+ * {@link SamzaContainerStatus}
* <br>
* <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
* exceptions/errors.
@@ -48,8 +48,8 @@ public interface SamzaContainerListener {
/**
* Method invoked after the {@link SamzaContainer} has transitioned to
- * {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
- * {@link org.apache.samza.SamzaContainerStatus}
+ * {@link SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
+ * {@link SamzaContainerStatus}
* <br>
* <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}.
* @param t Throwable that caused the container failure.
diff --git a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
similarity index 93%
rename from samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
rename to samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
index 3b18138..878054e 100644
--- a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza;
+package org.apache.samza.container;
/**
@@ -44,12 +44,12 @@ public enum SamzaContainerStatus {
/**
* Indicates that the container is starting all the components required by the
- * {@link org.apache.samza.task.RunLoop} for processing
+ * {@link RunLoop} for processing
*/
STARTING,
/**
- * Indicates that the container started the {@link org.apache.samza.task.RunLoop}
+ * Indicates that the container started the {@link RunLoop}
*/
STARTED,
diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
index 1f87c7c..fe8e227 100644
--- a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
@@ -18,9 +18,6 @@
*/
package org.apache.samza.scheduler;
-import org.apache.samza.task.EpochTimeScheduler;
-
-
/**
* Delegates to {@link EpochTimeScheduler}. This is useful because it provides a write-only interface for user-facing
* purposes.
diff --git a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
similarity index 97%
rename from samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
rename to samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
index 820a6ae..ddc5b29 100644
--- a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.task;
+package org.apache.samza.scheduler;
import java.util.Map;
import java.util.TreeMap;
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.samza.scheduler.ScheduledCallback;
import static com.google.common.base.Preconditions.checkState;
@@ -81,7 +80,7 @@ public class EpochTimeScheduler {
}
}
- void registerListener(TimerListener listener) {
+ public void registerListener(TimerListener listener) {
timerListener = listener;
if (!readyTimers.isEmpty()) {
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 5c178aa..0ba2032 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
* callback is called multiple times, it will throw IllegalStateException
* to the listener.
*/
-class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
+public class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class);
final TaskName taskName;
@@ -60,6 +60,22 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
this.timeCreatedNs = timeCreatedNs;
}
+ public TaskName getTaskName() {
+ return taskName;
+ }
+
+ public IncomingMessageEnvelope getEnvelope() {
+ return envelope;
+ }
+
+ public ReadableCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ public long getTimeCreatedNs() {
+ return timeCreatedNs;
+ }
+
@Override
public void complete() {
if (scheduledFuture != null) {
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
index de4ee58..77dad98 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
@@ -24,7 +24,7 @@ package org.apache.samza.task;
* callback events. If the callback completes with success, onComplete() will be fired.
* If the callback fails, onFailure() will be fired.
*/
-interface TaskCallbackListener {
+public interface TaskCallbackListener {
void onComplete(TaskCallback callback);
void onFailure(TaskCallback callback, Throwable t);
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 9d87e6e..2d49de7 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -41,7 +41,7 @@ import org.apache.samza.util.ThreadUtil;
* for the callbacks based on the sequence number, and updates the offsets for checkpointing
* by always moving forward to the latest contiguous callback (uses the high watermark).
*/
-class TaskCallbackManager {
+public class TaskCallbackManager {
private static final class TaskCallbacks {
private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c38274b..df195cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -55,7 +55,7 @@ import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Util, _}
-import org.apache.samza.{SamzaContainerStatus, SamzaException}
+import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index ae6db22..884244e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.context._
import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
+import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback}
import org.apache.samza.startpoint.Startpoint
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.TaskStorageManager
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
similarity index 98%
rename from samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java
rename to samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 1aa43e5..41e55ed 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.task;
+package org.apache.samza.container;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,11 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.OffsetManager;
-import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
-import org.apache.samza.container.TaskInstanceExceptionHandler;
-import org.apache.samza.container.TaskInstanceMetrics;
-import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.TaskModel;
@@ -47,6 +42,14 @@ import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.TestSystemConsumers;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.EndOfStreamListenerTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackImpl;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.task.WindowableTask;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -294,7 +297,7 @@ public class TestRunLoop {
return new TestCode() {
@Override
public void run(TaskCallback callback) {
- IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+ IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
if (envelope.equals(envelope0)) {
// process first message will wait till the second one is processed
try {
@@ -694,7 +697,7 @@ public class TestRunLoop {
final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1);
final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1);
task0.callbackHandler = callback -> {
- IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+ IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
try {
if (envelope.equals(firstMsg)) {
firstMsgCompletionLatch.await();
@@ -745,7 +748,7 @@ public class TestRunLoop {
CountDownLatch commitLatch = new CountDownLatch(1);
task0.commitHandler = callback -> {
TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
- if (taskCallback.envelope.equals(envelope3)) {
+ if (taskCallback.getEnvelope().equals(envelope3)) {
try {
commitLatch.await();
} catch (InterruptedException e) {
@@ -756,7 +759,7 @@ public class TestRunLoop {
task0.callbackHandler = callback -> {
TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
- if (taskCallback.envelope.equals(envelope0)) {
+ if (taskCallback.getEnvelope().equals(envelope0)) {
// Both the process call has gone through when the first commit is in progress.
assertEquals(2, containerMetrics.processes().getCount());
assertEquals(0, containerMetrics.commits().getCount());
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 75c39ac..d5bce16 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -27,11 +27,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.SamzaContainerStatus;
+import org.apache.samza.container.SamzaContainerStatus;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.task.RunLoop;
+import org.apache.samza.container.RunLoop;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.ContainerModel;
diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
index 649a4e4..58ef1c1 100644
--- a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.scheduler;
-import org.apache.samza.task.EpochTimeScheduler;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
similarity index 97%
rename from samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
rename to samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
index da137e6..5db908c 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
@@ -17,12 +17,14 @@
* under the License.
*/
-package org.apache.samza.task;
+package org.apache.samza.scheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index e75fe54..0d267a1 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -31,7 +31,7 @@ import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer}
import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
import org.apache.samza.system._
import org.apache.samza.task.{StreamTaskFactory, TaskFactory}
-import org.apache.samza.{Partition, SamzaContainerStatus}
+import org.apache.samza.Partition
import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.Matchers.{any, notNull}
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 5979bc4..5ab7635 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -29,7 +29,7 @@ import org.apache.samza.serializers.SerdeManager
import org.apache.samza.storage.ContainerStorageManager
import org.apache.samza.system._
import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.{RunLoop, StreamTask, TaskInstanceCollector}
+import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
import org.mockito.Mockito