You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/03/10 01:33:19 UTC

[10/28] reef git commit: [REEF-1942] Add ability for .NET Client to wait for Driver to complete.

[REEF-1942] Add ability for .NET Client to wait for Driver to complete.

  * Added `IJobSubmissionResult.WaitForDriverToFinish()` which blocks
    until the Driver is done. This uses a new HTTP handler on the Java
    side of the Driver.
  * Added `DriverStatusHTTPHandler` to REEF / Java to query the Driver's
    status via HTTP and added that Handler to the Driver Configuration
    used by REEF.NET.
  * Added a call to block at the end of HelloREEF.

This change also adds a bunch of infrastructure to make this more
informative in the future, e.g. `DriverStatus` in .NET. To keep this
focused, I did not expose that machinery.

JIRA:
  [REEF-1942](https://issues.apache.org/jira/browse/REEF-1942)

Pull Request:
  This closes #1408


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fece6294
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fece6294
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fece6294

Branch: refs/heads/REEF-335
Commit: fece629495a8a4946136577658d3d81356166354
Parents: bee789a
Author: Markus Weimer <we...@apache.org>
Authored: Wed Nov 1 10:28:14 2017 -0700
Committer: jwang98052 <ju...@apache.org>
Committed: Thu Nov 2 18:05:44 2017 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Client/API/DriverStatus.cs  |  96 ++++++++++++++
 .../Common/IJobSubmissionResult.cs              |   7 +
 .../Common/JobSubmissionResult.cs               |  40 +++++-
 .../Org.Apache.REEF.Client.csproj               |   1 +
 .../HelloREEF.cs                                |   6 +-
 .../apache/reef/bridge/client/Constants.java    |  10 ++
 .../bridge/client/DriverStatusHTTPHandler.java  | 129 +++++++++++++++++++
 .../client/TestDriverStatusHTTPHandler.java     | 122 ++++++++++++++++++
 8 files changed, 405 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/cs/Org.Apache.REEF.Client/API/DriverStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/DriverStatus.cs b/lang/cs/Org.Apache.REEF.Client/API/DriverStatus.cs
new file mode 100644
index 0000000..d9934a9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/API/DriverStatus.cs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Client.API
+{
+    /// <summary>
+    /// Represents the status of a Driver.
+    /// </summary>
+    internal enum DriverStatus
+    {
+        /// <summary>
+        /// Driver is initializing.
+        /// </summary>
+        INIT,
+
+        /// <summary>
+        /// Driver is running.
+        /// </summary>
+        RUNNING,
+
+        /// <summary>
+        /// Driver has cleanly exited.
+        /// </summary>
+        DONE,
+
+        /// <summary>
+        /// Driver is suspended.
+        /// </summary>
+        SUSPEND,
+
+        /// <summary>
+        /// Driver was killed.
+        /// </summary>
+        KILLED,
+
+        /// <summary>
+        /// Driver Failed.
+        /// </summary>
+        FAILED,
+
+        /// <summary>
+        /// Driver was RUNNING, but is no longer reachable for unknown reasons.
+        /// </summary>
+        UNKNOWN_EXITED
+    }
+
+    /// <summary>
+    /// Extension methods for DriverStatus
+    /// </summary>
+    internal static class DriverStatusMethods
+    {
+        /// <summary>
+        /// Indicates that the Driver is active: INIT or RUNNING.
+        /// </summary>
+        /// <param name="status"></param>
+        /// <returns>Whether the Driver is active.</returns>
+        public static bool IsActive(this DriverStatus status)
+        {
+            return status == DriverStatus.RUNNING || status == DriverStatus.INIT;
+        }
+
+        /// <summary>
+        /// Indicates that the Driver is not active (INIT or RUNNING).
+        /// </summary>
+        /// <param name="status"></param>
+        /// <returns>Whether the driver is not active.</returns>
+        public static bool IsNotActive(this DriverStatus status)
+        {
+            return !IsActive(status);
+        }
+
+        /// <summary>
+        /// Parses a string representation of a DriverStatus.
+        /// </summary>
+        /// <param name="statusString">The string to parse.</param>
+        /// <returns>The DriverStatus represented in the string.</returns>
+        public static DriverStatus Parse(string statusString)
+        {
+            return (DriverStatus)System.Enum.Parse(typeof(DriverStatus), statusString);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
index 325d011..328677a 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
@@ -45,5 +45,12 @@ namespace Org.Apache.REEF.Client.Common
         /// Get Yarn application id after Job is submited
         /// </summary>
         string AppId { get; }
+
+        /// <summary>
+        /// Waits for the Driver to complete.
+        /// </summary>
+        /// <exception cref="System.Net.WebException">If the Driver cannot be reached.</exception>
+        [Unstable("0.17", "Uses the HTTP server in the Java Driver. Might not work if that cannot be reached.")]
+        void WaitForDriverToFinish();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
index 7c8a7d4..3880433 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
@@ -63,7 +63,7 @@ namespace Org.Apache.REEF.Client.Common
         /// <summary>
         /// Returns http end point of the web server running in the driver
         /// </summary>
-        public string DriverUrl 
+        public string DriverUrl
         {
             get { return _driverUrl; }
         }
@@ -96,6 +96,36 @@ namespace Org.Apache.REEF.Client.Common
             return task.Result;
         }
 
+        public void WaitForDriverToFinish()
+        {
+            DriverStatus status = FetchDriverStatus();
+            
+            while (status.IsActive())
+            {
+                try
+                {
+                    status = FetchDriverStatus();
+                }
+                catch (System.Net.WebException)
+                {
+                    // If we no longer can reach the Driver, it must have exited.
+                    status = DriverStatus.UNKNOWN_EXITED;
+                }
+            }
+        }
+
+        private DriverStatus FetchDriverStatus()
+        {
+            string statusUrl = DriverUrl + "driverstatus/v1";
+            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(statusUrl);
+            using (StreamReader reader = new StreamReader(request.GetResponse().GetResponseStream()))
+            {
+                string statusString = reader.ReadToEnd();
+                LOGGER.Log(Level.Verbose, "Status received: {0}", statusString);
+                return DriverStatusMethods.Parse(statusString);
+            }
+        }
+
         protected abstract string GetDriverUrl(string filepath);
 
         enum UrlResultKind
@@ -116,7 +146,7 @@ namespace Org.Apache.REEF.Client.Common
                 var rmList = new List<string>();
                 var rmUri = sr.ReadLine();
                 while (rmUri != null)
-                {                    
+                {
                     rmList.Add(rmUri);
                     rmUri = sr.ReadLine();
                 }
@@ -134,12 +164,12 @@ namespace Org.Apache.REEF.Client.Common
             LOGGER.Log(Level.Warning, "CallUrl result " + result.Item2);
             return result.Item2;
         }
-        
+
         internal async Task<string> GetAppIdTrackingUrl(string url)
         {
             var result = await TryGetUri(url);
-            if (HasCommandFailed(result) ||  
-                result.Item2 == null)                
+            if (HasCommandFailed(result) ||
+                result.Item2 == null)
             {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index a0b4f75..00c6d87 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -65,6 +65,7 @@ under the License.
     <Compile Include="API\AppParameters.cs" />
     <Compile Include="API\AppParametersBuilder.cs" />
     <Compile Include="API\ClientFactory.cs" />
+    <Compile Include="API\DriverStatus.cs" />
     <Compile Include="API\Exceptions\ClasspathException.cs" />
     <Compile Include="API\Exceptions\JavaNotFoundException.cs" />
     <Compile Include="API\IREEFClient.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 1164719..b1d0cd3 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Client.Common;
 
 namespace Org.Apache.REEF.Examples.HelloREEF
 {
@@ -68,7 +69,10 @@ namespace Org.Apache.REEF.Examples.HelloREEF
                 .SetJavaLogLevel(JavaLoggingSetting.Verbose)
                 .Build();
 
-            _reefClient.Submit(helloJobRequest);
+            IJobSubmissionResult jobSubmissionResult = _reefClient.SubmitAndGetJobStatus(helloJobRequest);
+
+            // Wait for the Driver to complete.
+            jobSubmissionResult.WaitForDriverToFinish();
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
index ac41f9f..0eaef00 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java
@@ -23,8 +23,10 @@ import org.apache.reef.client.DriverServiceConfiguration;
 import org.apache.reef.client.DriverRestartConfiguration;
 import org.apache.reef.io.network.naming.NameServerConfiguration;
 import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.webserver.HttpHandlerConfiguration;
 import org.apache.reef.webserver.HttpServerReefEventHandler;
 import org.apache.reef.webserver.ReefEventStateManager;
@@ -34,6 +36,8 @@ import org.apache.reef.webserver.ReefEventStateManager;
  */
 public final class Constants {
 
+  private static final Tang TANG = Tang.Factory.getTang();
+
   /**
    * Contains all bindings of event handlers to the bridge.
    */
@@ -60,6 +64,8 @@ public final class Constants {
   public static final Configuration HTTP_SERVER_CONFIGURATION = Configurations.merge(
       HttpHandlerConfiguration.CONF
           .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class)
+          // Add the http status handler.
+          .set(HttpHandlerConfiguration.HTTP_HANDLERS, DriverStatusHTTPHandler.class)
           .build(),
       DriverServiceConfiguration.CONF
           .set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED,
@@ -76,6 +82,10 @@ public final class Constants {
               ReefEventStateManager.DriverRestartActiveContextStateHandler.class)
           .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
               ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
+          .build(),
+      // Bind the HTTP handler for job status
+      TANG.newConfigurationBuilder()
+          .bindImplementation(JobStatusHandler.class, DriverStatusHTTPHandler.class)
           .build()
   );
 

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/DriverStatusHTTPHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/DriverStatusHTTPHandler.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/DriverStatusHTTPHandler.java
new file mode 100644
index 0000000..058f565
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/DriverStatusHTTPHandler.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.client.JobStatusHandler;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final class DriverStatusHTTPHandler implements HttpHandler, JobStatusHandler {
+
+  private static final Logger LOG = Logger.getLogger(DriverStatusHTTPHandler.class.getName());
+
+  /**
+   * The URI under which this handler answers.
+   */
+  private String uriSpecification = "driverstatus";
+
+  /**
+   * A queue of messages to be sent to the client.
+   */
+  private final Queue<ReefServiceProtos.JobStatusProto> statusMessagesToSend = new LinkedList<>();
+
+  /**
+   * The last status received by this object in its role as JobStatusHandler.
+   */
+  private ReefServiceProtos.JobStatusProto lastStatus = null;
+
+  @Inject
+  DriverStatusHTTPHandler(){
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(final String newUriSpecification) {
+    this.uriSpecification = newUriSpecification;
+  }
+
+  @Override
+  public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response)
+      throws IOException, ServletException {
+    try (final PrintWriter writer = response.getWriter()) {
+      writer.write(waitAndGetMessage());
+    }
+  }
+
+  @Override
+  public void onNext(final ReefServiceProtos.JobStatusProto value) {
+    LOG.log(Level.INFO, "Received status: {0}", value.getState().name());
+    // Record the status received and notify the thread to send an answer.
+    synchronized (this) {
+      this.statusMessagesToSend.add(value);
+      this.lastStatus = value;
+      this.notifyAll();
+    }
+  }
+
+  @Override
+  public ReefServiceProtos.JobStatusProto getLastStatus() {
+    return this.lastStatus;
+  }
+
+  @Override
+  public String toString() {
+    return "DriverStatusHTTPHandler{uriSpec=" + getUriSpecification() + "}";
+  }
+
+  /**
+   * Waits for a status message to be available and returns it.
+   *
+   * @return the first available status message.
+   */
+  String waitAndGetMessage() {
+    synchronized (this) {
+      // Wait for a message to send.
+      while (this.statusMessagesToSend.isEmpty()) {
+        try {
+          this.wait();
+        } catch (final InterruptedException e) {
+          LOG.log(Level.FINE, "Interrupted. Ignoring.");
+        }
+      }
+
+      // Send the message
+      return getMessageForStatus(this.statusMessagesToSend.poll());
+    }
+  }
+
+  /**
+   * Generates a string to be sent to the client based on a
+   * {@link org.apache.reef.proto.ReefServiceProtos.JobStatusProto}.
+   *
+   * @param status the status to be converted to String.
+   * @return the string to be sent back to the HTTP client.
+   */
+  static String getMessageForStatus(final ReefServiceProtos.JobStatusProto status) {
+    return status.getState().name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/fece6294/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestDriverStatusHTTPHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestDriverStatusHTTPHandler.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestDriverStatusHTTPHandler.java
new file mode 100644
index 0000000..222832a
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestDriverStatusHTTPHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.logging.Logger;
+
+/**
+ * Tests for {@link DriverStatusHTTPHandler}.
+ */
+public final class TestDriverStatusHTTPHandler {
+
+  private static final Logger LOG = Logger.getLogger(TestDriverStatusHTTPHandler.class.getName());
+  private static final String TEST_DRIVER_ID = "TestDriver";
+
+  /**
+   * An array of all statuses to test.
+   */
+  private final ReefServiceProtos.JobStatusProto[] allStatuses = new ReefServiceProtos.JobStatusProto[] {
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.INIT).build(),
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.RUNNING).build(),
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.DONE).build(),
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.SUSPEND).build(),
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.FAILED).build(),
+      ReefServiceProtos.JobStatusProto.newBuilder()
+          .setIdentifier(TEST_DRIVER_ID)
+          .setState(ReefServiceProtos.State.KILLED).build()
+  };
+
+  /**
+   * Make sure we get the right strings for the driver status.
+   */
+  @Test
+  public void testMessageForProto() {
+    for (final ReefServiceProtos.JobStatusProto status : allStatuses) {
+      Assert.assertEquals(status.getState().name(), DriverStatusHTTPHandler.getMessageForStatus(status));
+    }
+  }
+
+  /**
+   * Make sure {@link DriverStatusHTTPHandler} implements
+   * {@link org.apache.reef.runtime.common.driver.client.JobStatusHandler}.
+   */
+  @Test
+  public void testLastStatus() {
+    final DriverStatusHTTPHandler tester = new DriverStatusHTTPHandler();
+
+    for (final ReefServiceProtos.JobStatusProto status : allStatuses) {
+      tester.onNext(status);
+      Assert.assertSame(status, tester.getLastStatus());
+    }
+  }
+
+  /**
+   * Test the wait and notify for correctness.
+   */
+  @Test
+  public void testAsyncCalls() throws InterruptedException {
+    final DriverStatusHTTPHandler tester = new DriverStatusHTTPHandler();
+
+    final WaitingRunnable waiter = new WaitingRunnable(tester);
+
+    for (final ReefServiceProtos.JobStatusProto status : allStatuses) {
+      final Thread waitingThread = new Thread(waiter);
+      waitingThread.start();
+      Assert.assertTrue(waitingThread.isAlive());
+      Assert.assertNull(waiter.getResult());
+      tester.onNext(status);
+      waitingThread.join();
+      Assert.assertEquals(DriverStatusHTTPHandler.getMessageForStatus(status), waiter.getResult());
+    }
+  }
+
+  private final class WaitingRunnable implements Runnable {
+    private final DriverStatusHTTPHandler handler;
+    private String result = null;
+
+    private WaitingRunnable(final DriverStatusHTTPHandler handler) {
+      this.handler = handler;
+    }
+
+    @Override
+    public synchronized void run() {
+      result = handler.waitAndGetMessage();
+    }
+
+    public synchronized String getResult() {
+      final String returnValue = result;
+      result = null;
+      return returnValue;
+    }
+  }
+}