You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/28 22:16:43 UTC

[samza] branch master updated: SAMZA-2211: [CLEANUP] Move RunLoop to container package

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 21608cb  SAMZA-2211: [CLEANUP] Move RunLoop to container package
21608cb is described below

commit 21608cb8c2d115a32fd94adaaea2af7d69d3c689
Author: mynameborat <bh...@gmail.com>
AuthorDate: Tue May 28 15:16:37 2019 -0700

    SAMZA-2211: [CLEANUP] Move RunLoop to container package
    
    prateekm xinyuiscool please take a look
    
    Author: mynameborat <bh...@gmail.com>
    
    Reviewers: Prateek Maheshwari <pm...@apache.org>
    
    Closes #1046 from mynameborat/move-run-loop
---
 .../apache/samza/{task => container}/RunLoop.java  | 37 ++++++++++++++--------
 .../org/apache/samza/container/RunLoopFactory.java |  1 -
 .../samza/container/SamzaContainerListener.java    | 16 +++++-----
 .../{ => container}/SamzaContainerStatus.java      |  6 ++--
 .../samza/scheduler/CallbackSchedulerImpl.java     |  3 --
 .../{task => scheduler}/EpochTimeScheduler.java    |  5 ++-
 .../org/apache/samza/task/TaskCallbackImpl.java    | 18 ++++++++++-
 .../apache/samza/task/TaskCallbackListener.java    |  2 +-
 .../org/apache/samza/task/TaskCallbackManager.java |  2 +-
 .../apache/samza/container/SamzaContainer.scala    |  2 +-
 .../org/apache/samza/container/TaskInstance.scala  |  2 +-
 .../samza/{task => container}/TestRunLoop.java     | 23 ++++++++------
 .../samza/processor/TestStreamProcessor.java       |  4 +--
 .../samza/scheduler/TestCallbackSchedulerImpl.java |  1 -
 .../TestEpochTimeScheduler.java                    |  4 ++-
 .../samza/container/TestSamzaContainer.scala       |  2 +-
 .../samza/processor/StreamProcessorTestUtils.scala |  2 +-
 17 files changed, 78 insertions(+), 52 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
similarity index 95%
rename from samza-core/src/main/java/org/apache/samza/task/RunLoop.java
rename to samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 004c31d..a509a27 100644
--- a/samza-core/src/main/java/org/apache/samza/task/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.task;
+package org.apache.samza.container;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -34,13 +34,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
-import org.apache.samza.container.TaskInstanceMetrics;
-import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.CoordinatorRequests;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.apache.samza.task.TaskCallbackImpl;
+import org.apache.samza.task.TaskCallbackListener;
+import org.apache.samza.task.TaskCallbackManager;
+import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.util.Throttleable;
 import org.apache.samza.util.ThrottlingScheduler;
@@ -50,7 +55,13 @@ import scala.collection.JavaConverters;
 
 
 /**
- * The RunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s.
+ * The run loop supports both single-threaded and multi-threaded execution models.
+ *    <p>
+ *      If job.container.thread.pool.size &gt; 1 (multi-threaded), operations like commit, window and timer for all tasks within a container
+ *      happens on a thread pool.
+ *      If job.container.thread.pool.size &lt; 1 (single-threaded), operations for all tasks are multiplexed onto one execution thread.
+ *    </p>.
+ *    Note: In both models, process/processAsync for all tasks is invoked on the run loop thread.
  */
 public class RunLoop implements Runnable, Throttleable {
   private static final Logger log = LoggerFactory.getLogger(RunLoop.class);
@@ -340,7 +351,7 @@ public class RunLoop implements Runnable, Throttleable {
   }
 
   /**
-   * The AsyncTaskWorker encapsulates the states of an {@link AsyncStreamTask}. If the task becomes ready, it
+   * The AsyncTaskWorker encapsulates the states of an {@link org.apache.samza.task.AsyncStreamTask}. If the task becomes ready, it
    * will run the task asynchronously. It runs window and commit in the provided thread pool.
    */
   private class AsyncTaskWorker implements TaskCallbackListener {
@@ -596,7 +607,7 @@ public class RunLoop implements Runnable, Throttleable {
      */
     @Override
     public void onComplete(final TaskCallback callback) {
-      long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs;
+      long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).getTimeCreatedNs();
       callbackExecutor.schedule(new Runnable() {
         @Override
         public void run() {
@@ -604,20 +615,20 @@ public class RunLoop implements Runnable, Throttleable {
             state.doneProcess();
             state.taskMetrics.asyncCallbackCompleted().inc();
             TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-            containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs);
+            containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.getTimeCreatedNs());
             log.trace("Got callback complete for task {}, ssp {}",
-                callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+                callbackImpl.getTaskName(), callbackImpl.getEnvelope().getSystemStreamPartition());
 
             List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callbackImpl);
             for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
-              IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
+              IncomingMessageEnvelope envelope = callbackToUpdate.getEnvelope();
               log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
               task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update coordinator
-              coordinatorRequests.update(callbackToUpdate.coordinator);
+              coordinatorRequests.update(callbackToUpdate.getCoordinator());
             }
           } catch (Throwable t) {
             log.error("Error marking process as complete.", t);
@@ -641,7 +652,7 @@ public class RunLoop implements Runnable, Throttleable {
         abort(t);
         // update pending count, but not offset
         TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-        log.error("Got callback failure for task {}", callbackImpl.taskName, t);
+        log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
       } catch (Throwable e) {
         log.error("Error marking process as failed.", e);
       } finally {
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index d1280fa..b50a270 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -22,7 +22,6 @@ package org.apache.samza.container;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.system.SystemConsumers;
-import org.apache.samza.task.RunLoop;
 import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index 87321b7..db532c5 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -24,22 +24,22 @@ package org.apache.samza.container;
 public interface SamzaContainerListener {
 
   /**
-   * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED}
-   * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence.
+   * Method invoked when the {@link SamzaContainer} state is {@link SamzaContainerStatus#NOT_STARTED}
+   * and is about to transition to {@link SamzaContainerStatus#STARTING} to start the initialization sequence.
    */
   void beforeStart();
 
   /**
    *  Method invoked after the {@link SamzaContainer} has successfully transitioned to
-   *  the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
-   *  {@link org.apache.samza.task.RunLoop}
+   *  the {@link SamzaContainerStatus#STARTED} state and is about to start the
+   *  {@link RunLoop}
    */
   void afterStart();
 
   /**
    *  Method invoked after the {@link SamzaContainer} has successfully transitioned to
-   *  {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
-   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  {@link SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
+   *  {@link SamzaContainerStatus}
    *  <br>
    *  <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
    *  exceptions/errors.
@@ -48,8 +48,8 @@ public interface SamzaContainerListener {
 
   /**
    *  Method invoked after the {@link SamzaContainer} has  transitioned to
-   *  {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
-   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  {@link SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
+   *  {@link SamzaContainerStatus}
    *  <br>
    *  <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}.
    *  @param t Throwable that caused the container failure.
diff --git a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
similarity index 93%
rename from samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
rename to samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
index 3b18138..878054e 100644
--- a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerStatus.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza;
+package org.apache.samza.container;
 
 
 /**
@@ -44,12 +44,12 @@ public enum  SamzaContainerStatus {
 
   /**
    * Indicates that the container is starting all the components required by the
-   * {@link org.apache.samza.task.RunLoop} for processing
+   * {@link RunLoop} for processing
    */
   STARTING,
 
   /**
-   * Indicates that the container started the {@link org.apache.samza.task.RunLoop}
+   * Indicates that the container started the {@link RunLoop}
    */
   STARTED,
 
diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
index 1f87c7c..fe8e227 100644
--- a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
@@ -18,9 +18,6 @@
  */
 package org.apache.samza.scheduler;
 
-import org.apache.samza.task.EpochTimeScheduler;
-
-
 /**
  * Delegates to {@link EpochTimeScheduler}. This is useful because it provides a write-only interface for user-facing
  * purposes.
diff --git a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
similarity index 97%
rename from samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
rename to samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
index 820a6ae..ddc5b29 100644
--- a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.task;
+package org.apache.samza.scheduler;
 
 import java.util.Map;
 import java.util.TreeMap;
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.scheduler.ScheduledCallback;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -81,7 +80,7 @@ public class EpochTimeScheduler {
     }
   }
 
-  void registerListener(TimerListener listener) {
+  public void registerListener(TimerListener listener) {
     timerListener = listener;
 
     if (!readyTimers.isEmpty()) {
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 5c178aa..0ba2032 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
  * callback is called multiple times, it will throw IllegalStateException
  * to the listener.
  */
-class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
+public class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
   private static final Logger log = LoggerFactory.getLogger(TaskCallbackImpl.class);
 
   final TaskName taskName;
@@ -60,6 +60,22 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> {
     this.timeCreatedNs = timeCreatedNs;
   }
 
+  public TaskName getTaskName() {
+    return taskName;
+  }
+
+  public IncomingMessageEnvelope getEnvelope() {
+    return envelope;
+  }
+
+  public ReadableCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  public long getTimeCreatedNs() {
+    return timeCreatedNs;
+  }
+
   @Override
   public void complete() {
     if (scheduledFuture != null) {
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
index de4ee58..77dad98 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java
@@ -24,7 +24,7 @@ package org.apache.samza.task;
  * callback events. If the callback completes with success, onComplete() will be fired.
  * If the callback fails, onFailure() will be fired.
  */
-interface TaskCallbackListener {
+public interface TaskCallbackListener {
   void onComplete(TaskCallback callback);
   void onFailure(TaskCallback callback, Throwable t);
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 9d87e6e..2d49de7 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -41,7 +41,7 @@ import org.apache.samza.util.ThreadUtil;
  * for the callbacks based on the sequence number, and updates the offsets for checkpointing
  * by always moving forward to the latest contiguous callback (uses the high watermark).
  */
-class TaskCallbackManager {
+public class TaskCallbackManager {
 
   private static final class TaskCallbacks {
     private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c38274b..df195cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -55,7 +55,7 @@ import org.apache.samza.table.TableManager
 import org.apache.samza.task._
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{Util, _}
-import org.apache.samza.{SamzaContainerStatus, SamzaException}
+import org.apache.samza.SamzaException
 
 import scala.collection.JavaConverters._
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index ae6db22..884244e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.context._
 import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
+import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback}
 import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.TaskStorageManager
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
similarity index 98%
rename from samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java
rename to samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 1aa43e5..41e55ed 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.task;
+package org.apache.samza.container;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,11 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.OffsetManager;
-import org.apache.samza.container.SamzaContainerMetrics;
-import org.apache.samza.container.TaskInstance;
-import org.apache.samza.container.TaskInstanceExceptionHandler;
-import org.apache.samza.container.TaskInstanceMetrics;
-import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.TaskModel;
@@ -47,6 +42,14 @@ import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.EndOfStreamListenerTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackImpl;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.task.WindowableTask;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -294,7 +297,7 @@ public class TestRunLoop {
     return new TestCode() {
       @Override
       public void run(TaskCallback callback) {
-        IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+        IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
         if (envelope.equals(envelope0)) {
           // process first message will wait till the second one is processed
           try {
@@ -694,7 +697,7 @@ public class TestRunLoop {
     final CountDownLatch firstMsgCompletionLatch = new CountDownLatch(1);
     final CountDownLatch secondMsgCompletionLatch = new CountDownLatch(1);
     task0.callbackHandler = callback -> {
-      IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
+      IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).getEnvelope();
       try {
         if (envelope.equals(firstMsg)) {
           firstMsgCompletionLatch.await();
@@ -745,7 +748,7 @@ public class TestRunLoop {
     CountDownLatch commitLatch = new CountDownLatch(1);
     task0.commitHandler = callback -> {
       TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
-      if (taskCallback.envelope.equals(envelope3)) {
+      if (taskCallback.getEnvelope().equals(envelope3)) {
         try {
           commitLatch.await();
         } catch (InterruptedException e) {
@@ -756,7 +759,7 @@ public class TestRunLoop {
 
     task0.callbackHandler = callback -> {
       TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
-      if (taskCallback.envelope.equals(envelope0)) {
+      if (taskCallback.getEnvelope().equals(envelope0)) {
         // Both the process call has gone through when the first commit is in progress.
         assertEquals(2, containerMetrics.processes().getCount());
         assertEquals(0, containerMetrics.commits().getCount());
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 75c39ac..d5bce16 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -27,11 +27,11 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.SamzaContainerStatus;
+import org.apache.samza.container.SamzaContainerStatus;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.task.RunLoop;
+import org.apache.samza.container.RunLoop;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
index 649a4e4..58ef1c1 100644
--- a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.scheduler;
 
-import org.apache.samza.task.EpochTimeScheduler;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
similarity index 97%
rename from samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
rename to samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
index da137e6..5db908c 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-package org.apache.samza.task;
+package org.apache.samza.scheduler;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index e75fe54..0d267a1 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -31,7 +31,7 @@ import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer}
 import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
 import org.apache.samza.system._
 import org.apache.samza.task.{StreamTaskFactory, TaskFactory}
-import org.apache.samza.{Partition, SamzaContainerStatus}
+import org.apache.samza.Partition
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Matchers.{any, notNull}
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 5979bc4..5ab7635 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -29,7 +29,7 @@ import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.ContainerStorageManager
 import org.apache.samza.system._
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.{RunLoop, StreamTask, TaskInstanceCollector}
+import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
 import org.mockito.Mockito