You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/03/20 19:55:05 UTC

twill git commit: (TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests

Repository: twill
Updated Branches:
  refs/heads/master 8f70aa4d4 -> ee4d13701


(TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests

- Fix a race condition in the LocationCacheTest
  - There is a small delay in the current timestamp in the
    LocationCacheCleaner.start and the one in the test method.
- Fix race condition in LogLevelChangeTestRun
  - The test assumes after the root logger level changed, the other logger levels also changed
    in the resource report, which is not true
  - The test is not checking the log levels for all runnnable instances

This closes #68 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/ee4d1370
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/ee4d1370
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/ee4d1370

Branch: refs/heads/master
Commit: ee4d13701b218305d034bfaa8474ef881995e65c
Parents: 8f70aa4
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 19 00:28:20 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Mar 20 12:54:58 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |  2 +-
 .../apache/twill/yarn/LocationCacheCleaner.java |  4 +-
 .../apache/twill/yarn/LocationCacheTest.java    | 11 +++-
 .../twill/yarn/LogLevelChangeTestRun.java       | 59 +++++++++++---------
 .../internal/zookeeper/InMemoryZKServer.java    | 18 ++----
 5 files changed, 51 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 55101b7..af74548 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,7 +31,7 @@ branches:
     - /^branch\-.*$/
     - /^feature\/.*$/
 
-script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false
 
 install: mvn --batch-mode install -P $PROFILE -DskipTests=true
 

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index 0738218..fed76a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -144,7 +144,9 @@ final class LocationCacheCleaner extends AbstractIdleService {
               }
               // If the location is already pending for cleanup, this won't update the expire time as
               // the comparison of PendingCleanup is only by location.
-              pendingCleanups.add(new PendingCleanup(location, expireTime));
+              if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
+                LOG.debug("Pending deletion of location {} with expiration time at {}", location, expireTime);
+              }
             }
           }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 8ed9ae4..63ca28b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -109,12 +109,17 @@ public class LocationCacheTest {
     newTwillRunner.start();
 
     // Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
-    // collected when the new twill runner was started
+    // collected when the new twill runner was started.
+    // Need to add some time in addition to the antique expiry time because the cache cleaner collects
+    // pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
+    // later than the System.currentTimeMillis() call in the next line.
     ((YarnTwillRunnerService) newTwillRunner)
-      .forceLocationCacheCleanup(System.currentTimeMillis() + Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+      .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
+                                   Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
 
     // Now there shouldn't be any file under the current session cache directory
-    Assert.assertTrue(currentSessionCache.list().isEmpty());
+    List<Location> locations = currentSessionCache.list();
+    Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
index 6df6d11..a1d8ae6 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -172,20 +173,20 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
 
     // assert that log level is DEBUG
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     // change the log level to INFO
     controller.updateLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, LogEntry.Level.INFO)).get();
 
     // assert log level has changed to INFO
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     // change the log level of LogLevelTestRunnable to WARN,
     // change the log level of LogLevelTestSecondRunnable to TRACE
@@ -195,16 +196,16 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     controller.updateLogLevels(LogLevelTestSecondRunnable.class.getSimpleName(), logLevelSecondRunnable).get();
 
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE), 1);
 
     // change a particular logger to log level warn and reset it back.
     logLevelFirstRunnable = ImmutableMap.of("test", LogEntry.Level.WARN);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
                     20L, TimeUnit.SECONDS, LogEntry.Level.WARN,
-                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN));
+                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN), 1);
     logLevelFirstRunnable = new HashMap<>();
     logLevelFirstRunnable.put("test", null);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
@@ -212,13 +213,13 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     result.put("ROOT", LogEntry.Level.WARN);
     result.put("test", null);
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // reset the log level for a particular logger of LogLevelTestRunnable
     controller.resetRunnableLogLevels(LogLevelTestRunnable.class.getSimpleName(), "test").get();
     result.remove("test");
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // change the log level of LogLevelTestSecondRunnable to INFO and change instances of it to test if the log level
     // request get applied to container started up later
@@ -228,14 +229,14 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     controller.changeInstances(LogLevelTestSecondRunnable.class.getSimpleName(), 2).get();
     TimeUnit.SECONDS.sleep(5);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(), 20L, TimeUnit.SECONDS,
-                    LogEntry.Level.INFO, logLevelSecondRunnable);
+                    LogEntry.Level.INFO, logLevelSecondRunnable, 2);
 
     // reset the log levels back to default.
     controller.resetLogLevels().get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 2);
 
     // stop
     controller.terminate().get(120, TimeUnit.SECONDS);
@@ -248,29 +249,37 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
   // could return null if the application has not fully started.
   private void waitForLogLevel(TwillController controller, String runnable, long timeout,
                                TimeUnit timeoutUnit, LogEntry.Level expected,
-                               Map<String, LogEntry.Level> expectedArgs) throws InterruptedException {
+                               Map<String, LogEntry.Level> expectedArgs,
+                               int expectedInstances) throws InterruptedException {
 
     Stopwatch stopwatch = new Stopwatch();
     stopwatch.start();
-    LogEntry.Level actual = null;
-    Map<String, LogEntry.Level> actualArgs = null;
-    boolean stopped = false;
-    do {
+    while (stopwatch.elapsedTime(timeoutUnit) < timeout) {
       ResourceReport report = controller.getResourceReport();
+
       if (report == null || report.getRunnableResources(runnable) == null) {
+        TimeUnit.MILLISECONDS.sleep(100);
         continue;
       }
+
+      int matchCount = 0;
       for (TwillRunResources resources : report.getRunnableResources(runnable)) {
-        actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
-        actualArgs = resources.getLogLevels();
-        if (actual != null && actual.equals(expected)) {
-          stopped = true;
-          break;
+        LogEntry.Level actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+        Map<String, LogEntry.Level> actualArgs = resources.getLogLevels();
+        if (Objects.equals(expected, actual) && Objects.equals(expectedArgs, actualArgs)) {
+          matchCount++;
+        } else {
+          LOG.info("Log levels not match for {}. {} != {} or {} != {}",
+                   runnable, expected, actual, expectedArgs, actualArgs);
         }
       }
+
+      if (matchCount == expectedInstances) {
+        return;
+      }
       TimeUnit.MILLISECONDS.sleep(100);
-    } while (!stopped && stopwatch.elapsedTime(timeoutUnit) < timeout);
-    Assert.assertEquals(expected, actual);
-    Assert.assertEquals(expectedArgs, actualArgs);
+    }
+
+    Assert.fail("Timeout waiting for expected log levels");
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
index f962b68..d18d5ed 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.zookeeper;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.Executor;
 
 /**
@@ -103,17 +101,11 @@ public final class InMemoryZKServer implements Service {
   }
 
   private InetSocketAddress getAddress(int port) {
-    try {
-      int socketPort = port < 0 ? 0 : port;
-      // This property is needed so that in certain CI environment (e.g. Travis-CI) it can only works properly if
-      // it is binded to the wildcard (0.0.0.0) address
-      if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
-        return new InetSocketAddress(InetAddress.getLocalHost(), socketPort);
-      } else {
-        return new InetSocketAddress(socketPort);
-      }
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
+    int socketPort = port < 0 ? 0 : port;
+    if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
+      return new InetSocketAddress(InetAddress.getLoopbackAddress(), socketPort);
+    } else {
+      return new InetSocketAddress(socketPort);
     }
   }