You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@joshua.apache.org by mj...@apache.org on 2016/08/29 17:42:42 UTC

[06/10] incubator-joshua git commit: JOSHUA-285 JOSHUA-296 Refactored threading in order to properly propagate failures and remove custom code

JOSHUA-285 JOSHUA-296 Refactored threading in order to properly propagate failures and remove custom code


Project: http://git-wip-us.apache.org/repos/asf/incubator-joshua/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-joshua/commit/d1c9c074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-joshua/tree/d1c9c074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-joshua/diff/d1c9c074

Branch: refs/heads/JOSHUA-304
Commit: d1c9c074544d72ee5335bdd83fe415b45098ab08
Parents: 762d588
Author: Kellen Sunderland <ke...@amazon.com>
Authored: Mon Aug 29 10:08:10 2016 +0200
Committer: Kellen Sunderland <ke...@amazon.com>
Committed: Mon Aug 29 13:52:33 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/joshua/decoder/Decoder.java | 229 ++++---------------
 .../apache/joshua/decoder/DecoderThread.java    |  10 +-
 .../joshua/decoder/JoshuaConfiguration.java     |   3 +
 .../org/apache/joshua/decoder/Translations.java |  18 ++
 .../org/apache/joshua/server/ServerThread.java  |   1 -
 .../system/MultithreadedTranslationTests.java   |  45 +++-
 6 files changed, 111 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/Decoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/Decoder.java b/src/main/java/org/apache/joshua/decoder/Decoder.java
index 2d753c1..bc64fda 100644
--- a/src/main/java/org/apache/joshua/decoder/Decoder.java
+++ b/src/main/java/org/apache/joshua/decoder/Decoder.java
@@ -31,11 +31,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import com.google.common.base.Strings;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.joshua.corpus.Vocabulary;
 import org.apache.joshua.decoder.ff.FeatureVector;
 import org.apache.joshua.decoder.ff.FeatureFunction;
@@ -83,6 +85,7 @@ import org.slf4j.LoggerFactory;
  * @author Zhifei Li, zhifei.work@gmail.com
  * @author wren ng thornton wren@users.sourceforge.net
  * @author Lane Schwartz dowobeha@users.sourceforge.net
+ * @author Kellen Sunderland kellen.sunderland@gmail.com
  */
 public class Decoder {
 
@@ -109,8 +112,6 @@ public class Decoder {
 
   public static int VERBOSE = 1;
 
-  private BlockingQueue<DecoderThread> threadPool = null;
-
   // ===============================================================
   // Constructors
   // ===============================================================
@@ -147,7 +148,6 @@ public class Decoder {
   private Decoder(JoshuaConfiguration joshuaConfiguration) {
     this.joshuaConfiguration = joshuaConfiguration;
     this.grammars = new ArrayList<>();
-    this.threadPool = new ArrayBlockingQueue<>(this.joshuaConfiguration.num_parallel_decoders, true);
     this.customPhraseTable = null;
     
     resetGlobalState();
@@ -165,188 +165,70 @@ public class Decoder {
     return new Decoder(joshuaConfiguration);
   }
 
-  // ===============================================================
-  // Public Methods
-  // ===============================================================
-
   /**
-   * This class is responsible for getting sentences from the TranslationRequest and procuring a
-   * DecoderThreadRunner to translate it. Each call to decodeAll(TranslationRequest) launches a
-   * thread that will read the request's sentences, obtain a DecoderThread to translate them, and
-   * then place the Translation in the appropriate place.
-   *
-   * @author Matt Post <po...@cs.jhu.edu>
+   * This function is the main entry point into the decoder. It translates all the sentences in a
+   * (possibly boundless) set of input sentences. Each request launches its own thread to read the
+   * sentences of the request.
    *
+   * @param request the populated {@link TranslationRequestStream}
+   * @throws RuntimeException if any fatal errors occur during translation
+   * @return an iterable, asynchronously-filled list of Translations
    */
-  private class RequestParallelizer extends Thread {
-    /* Source of sentences to translate. */
-    private final TranslationRequestStream request;
+  public Translations decodeAll(TranslationRequestStream request) {
+    Translations results = new Translations(request);
+    CompletableFuture.runAsync(() -> decodeAllAsync(request, results));
+    return results;
+  }
 
-    /* Where to put translated sentences. */
-    private final Translations response;
+  private void decodeAllAsync(TranslationRequestStream request,
+                              Translations responseStream) {
 
-    RequestParallelizer(TranslationRequestStream request, Translations response) {
-      this.request = request;
-      this.response = response;
-    }
-
-    @Override
-    public void run() {
-      /*
-       * Repeatedly get an input sentence, wait for a DecoderThread, and then start a new thread to
-       * translate the sentence. We start a new thread (via DecoderRunnerThread) as opposed to
-       * blocking, so that the RequestHandler can go on to the next sentence in this request, which
-       * allows parallelization across the sentences of the request.
-       */
-      for (;;) {
+    // Give the threadpool a friendly name to help debuggers
+    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("TranslationWorker-%d")
+            .setDaemon(true)
+            .build();
+    ExecutorService executor = Executors.newFixedThreadPool(this.joshuaConfiguration.num_parallel_decoders,
+            threadFactory);
+    try {
+      for (; ; ) {
         Sentence sentence = request.next();
 
         if (sentence == null) {
-          response.finish();
           break;
         }
 
-        // This will block until a DecoderThread becomes available.
-        DecoderThread thread = Decoder.this.getThread();
-        new DecoderThreadRunner(thread, sentence, response).start();
-      }
-    }
-
-    /**
-     * Strips the nonterminals from the lefthand side of the rule.
-     *
-     * @param rule
-     * @return
-     */
-    private String formatRule(Rule rule) {
-      String ruleString = "";
-      boolean first = true;
-      for (int word: rule.getFrench()) {
-        if (!first)
-          ruleString += " " + Vocabulary.word(word);
-        first = false;
-      }
-
-      ruleString += " |||"; // space will get added with first English word
-      first = true;
-      for (int word: rule.getEnglish()) {
-        if (!first)
-          ruleString += " " + Vocabulary.word(word);
-        first = false;
-      }
-
-      // strip of the leading space
-      return ruleString.substring(1);
-    }
-  }
-
-  /**
-   * Retrieve a thread from the thread pool, blocking until one is available. The blocking occurs in
-   * a fair fashion (i.e,. FIFO across requests).
-   *
-   * @return a thread that can be used for decoding.
-   */
-  public DecoderThread getThread() {
-    try {
-      return threadPool.take();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return null;
-  }
-
-  /**
-   * This class handles running a DecoderThread (which takes care of the actual translation of an
-   * input Sentence, returning a Translation object when its done). This is done in a thread so as
-   * not to tie up the RequestHandler that launched it, freeing it to go on to the next sentence in
-   * the TranslationRequest, in turn permitting parallelization across the sentences of a request.
-   *
-   * When the decoder thread is finshed, the Translation object is placed in the correct place in
-   * the corresponding Translations object that was returned to the caller of
-   * Decoder.decodeAll(TranslationRequest).
-   *
-   * @author Matt Post <po...@cs.jhu.edu>
-   */
-  private class DecoderThreadRunner extends Thread {
-
-    private final DecoderThread decoderThread;
-    private final Sentence sentence;
-    private final Translations translations;
-
-    DecoderThreadRunner(DecoderThread thread, Sentence sentence, Translations translations) {
-      this.decoderThread = thread;
-      this.sentence = sentence;
-      this.translations = translations;
-    }
-
-    @Override
-    public void run() {
-      /*
-       * Process any found metadata.
-       */
-      
-      /*
-       * Use the thread to translate the sentence. Then record the translation with the
-       * corresponding Translations object, and return the thread to the pool.
-       */
-      try {
-        Translation translation = decoderThread.translate(this.sentence);
-        translations.record(translation);
-
-        /*
-         * This is crucial! It's what makes the thread available for the next sentence to be
-         * translated.
-         */
-        threadPool.put(decoderThread);
-      } catch (Exception e) {
-        throw new RuntimeException(String.format(
-            "Input %d: FATAL UNCAUGHT EXCEPTION: %s", sentence.id(), e.getMessage()), e);
-        //        translations.record(new Translation(sentence, null, featureFunctions, joshuaConfiguration));
+        executor.execute(() -> {
+          try {
+            Translation result = decode(sentence);
+            responseStream.record(result);
+          } catch (Throwable ex) {
+            responseStream.propagate(ex);
+          }
+        });
       }
+      responseStream.finish();
+    } finally {
+      executor.shutdown();
     }
   }
 
-  /**
-   * This function is the main entry point into the decoder. It translates all the sentences in a
-   * (possibly boundless) set of input sentences. Each request launches its own thread to read the
-   * sentences of the request.
-   *
-   * @param request the populated {@link org.apache.joshua.decoder.io.TranslationRequestStream}
-   * @throws IOException if there is an error with the input stream or writing the output
-   * @return an iterable, asynchronously-filled list of Translations
-   */
-  public Translations decodeAll(TranslationRequestStream request) throws IOException {
-    Translations translations = new Translations(request);
-
-    /* Start a thread to handle requests on the input stream */
-    new RequestParallelizer(request, translations).start();
-
-    return translations;
-  }
-
 
   /**
-   * We can also just decode a single sentence.
+   * We can also just decode a single sentence in the same thread.
    *
    * @param sentence {@link org.apache.joshua.lattice.Lattice} input
+   * @throws RuntimeException if any fatal errors occur during translation
    * @return the sentence {@link org.apache.joshua.decoder.Translation}
    */
   public Translation decode(Sentence sentence) {
-    // Get a thread.
-
     try {
-      DecoderThread thread = threadPool.take();
-      Translation translation = thread.translate(sentence);
-      threadPool.put(thread);
-
-      return translation;
-
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      DecoderThread decoderThread = new DecoderThread(this.grammars, Decoder.weights, this.featureFunctions, joshuaConfiguration);
+      return decoderThread.translate(sentence);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+              "Input %d: FATAL UNCAUGHT EXCEPTION: %s", sentence.id(), e.getMessage()), e);
     }
-
-    return null;
   }
 
   /**
@@ -355,14 +237,6 @@ public class Decoder {
    * afterwards gets a fresh start.
    */
   public void cleanUp() {
-    // shut down DecoderThreads
-    for (DecoderThread thread : threadPool) {
-      try {
-        thread.join();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
     resetGlobalState();
   }
 
@@ -393,7 +267,7 @@ public class Decoder {
 
           } else { // models: replace the weight
             String[] fds = Regex.spaces.split(line);
-            StringBuffer newSent = new StringBuffer();
+            StringBuilder newSent = new StringBuilder();
             if (!Regex.floatingNumber.matches(fds[fds.length - 1])) {
               throw new IllegalArgumentException("last field is not a number; the field is: "
                   + fds[fds.length - 1]);
@@ -543,11 +417,8 @@ public class Decoder {
       }
 
       // Create the threads
-      for (int i = 0; i < joshuaConfiguration.num_parallel_decoders; i++) {
-        this.threadPool.put(new DecoderThread(this.grammars, Decoder.weights,
-            this.featureFunctions, joshuaConfiguration));
-      }
-    } catch (IOException | InterruptedException e) {
+      //TODO: (kellens) see if we need to wait until initialized before decoding
+    } catch (IOException e) {
       LOG.warn(e.getMessage(), e);
     }
 
@@ -579,7 +450,7 @@ public class Decoder {
         int span_limit = Integer.parseInt(parsedArgs.get("maxspan"));
         String path = parsedArgs.get("path");
 
-        Grammar grammar = null;
+        Grammar grammar;
         if (! type.equals("moses") && ! type.equals("phrase")) {
           if (new File(path).isDirectory()) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/DecoderThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/DecoderThread.java b/src/main/java/org/apache/joshua/decoder/DecoderThread.java
index bdbdba0..a6f39b1 100644
--- a/src/main/java/org/apache/joshua/decoder/DecoderThread.java
+++ b/src/main/java/org/apache/joshua/decoder/DecoderThread.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * @author Zhifei Li, zhifei.work@gmail.com
  */
 
-public class DecoderThread extends Thread {
+public class DecoderThread {
   private static final Logger LOG = LoggerFactory.getLogger(DecoderThread.class);
 
   private final JoshuaConfiguration joshuaConfiguration;
@@ -64,8 +64,9 @@ public class DecoderThread extends Thread {
   // ===============================================================
   // Constructor
   // ===============================================================
+  //TODO: (kellens) why is weights unused?
   public DecoderThread(List<Grammar> grammars, FeatureVector weights,
-      List<FeatureFunction> featureFunctions, JoshuaConfiguration joshuaConfiguration) throws IOException {
+                       List<FeatureFunction> featureFunctions, JoshuaConfiguration joshuaConfiguration) throws IOException {
 
     this.joshuaConfiguration = joshuaConfiguration;
     this.allGrammars = grammars;
@@ -84,11 +85,6 @@ public class DecoderThread extends Thread {
   // Methods
   // ===============================================================
 
-  @Override
-  public void run() {
-    // Nothing to do but wait.
-  }
-
   /**
    * Translate a sentence.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java b/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
index e6f2955..6fd73ba 100644
--- a/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
+++ b/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
@@ -272,6 +272,9 @@ public class JoshuaConfiguration {
   /* Weights overridden from the command line */
   public String weight_overwrite = "";
 
+  /* Timeout in seconds for threads */
+  public long translation_thread_timeout = 30_000;
+
   /**
    * This method resets the state of JoshuaConfiguration back to the state after initialization.
    * This is useful when for example making different calls to the decoder within the same java

http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/Translations.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/Translations.java b/src/main/java/org/apache/joshua/decoder/Translations.java
index 1eb859a..b3f3633 100644
--- a/src/main/java/org/apache/joshua/decoder/Translations.java
+++ b/src/main/java/org/apache/joshua/decoder/Translations.java
@@ -20,6 +20,8 @@ package org.apache.joshua.decoder;
 
 import java.util.Iterator;
 import java.util.LinkedList;
+
+import com.google.common.base.Throwables;
 import org.apache.joshua.decoder.io.TranslationRequestStream;
 
 /**
@@ -50,6 +52,7 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
   private boolean spent = false;
 
   private Translation nextTranslation;
+  private Throwable fatalException;
 
   public Translations(TranslationRequestStream request) {
     this.request = request;
@@ -144,6 +147,8 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
         }
       }
 
+      fatalErrorCheck();
+
       /* We now have the sentence and can return it. */
       currentID++;
       this.nextTranslation = translations.poll();
@@ -155,4 +160,17 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
   public Iterator<Translation> iterator() {
     return this;
   }
+
+  public void propagate(Throwable ex) {
+    synchronized (this) {
+      fatalException = ex;
+      notify();
+    }
+  }
+
+  private void fatalErrorCheck() {
+    if (fatalException != null) {
+      Throwables.propagate(fatalException);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/server/ServerThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/server/ServerThread.java b/src/main/java/org/apache/joshua/server/ServerThread.java
index 72caa5f..32c7b91 100644
--- a/src/main/java/org/apache/joshua/server/ServerThread.java
+++ b/src/main/java/org/apache/joshua/server/ServerThread.java
@@ -35,7 +35,6 @@ import java.util.HashMap;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 
-import org.apache.joshua.corpus.Vocabulary;
 import org.apache.joshua.decoder.Decoder;
 import org.apache.joshua.decoder.JoshuaConfiguration;
 import org.apache.joshua.decoder.Translation;

http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java b/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
index 7b1c47f..7a1d9f4 100644
--- a/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
+++ b/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
@@ -31,27 +31,31 @@ import org.apache.joshua.decoder.JoshuaConfiguration;
 import org.apache.joshua.decoder.Translation;
 import org.apache.joshua.decoder.Translations;
 import org.apache.joshua.decoder.io.TranslationRequestStream;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.apache.joshua.decoder.segment_file.Sentence;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertTrue;
 
 /**
  * Integration test for multithreaded Joshua decoder tests. Grammar used is a
  * toy packed grammar.
  *
- * @author kellens
+ * @author Kellen Sunderland kellen.sunderland@gmail.com
  */
 public class MultithreadedTranslationTests {
 
   private JoshuaConfiguration joshuaConfig = null;
   private Decoder decoder = null;
   private static final String INPUT = "A K B1 U Z1 Z2 B2 C";
+  private static final String EXCEPTION_MESSAGE = "This exception should properly propagate";
   private int previousLogLevel;
   private final static long NANO_SECONDS_PER_SECOND = 1_000_000_000;
 
-  @BeforeMethod
+  @BeforeClass
   public void setUp() throws Exception {
     joshuaConfig = new JoshuaConfiguration();
     joshuaConfig.search_algorithm = "cky";
@@ -88,7 +92,7 @@ public class MultithreadedTranslationTests {
     Decoder.VERBOSE = 0;
   }
 
-  @AfterMethod
+  @AfterClass
   public void tearDown() throws Exception {
     this.decoder.cleanUp();
     this.decoder = null;
@@ -105,7 +109,7 @@ public class MultithreadedTranslationTests {
   // should be sufficient to induce concurrent data access for many shared
   // data structures.
 
-  @Test
+  @Test()
   public void givenPackedGrammar_whenNTranslationsCalledConcurrently_thenReturnNResults() throws IOException {
     // GIVEN
 
@@ -125,7 +129,7 @@ public class MultithreadedTranslationTests {
     ByteArrayOutputStream output = new ByteArrayOutputStream();
 
     // WHEN
-    // Translate all spans in parallel.
+    // Translate all segments in parallel.
     Translations translations = this.decoder.decodeAll(req);
 
     ArrayList<Translation> translationResults = new ArrayList<Translation>();
@@ -146,10 +150,35 @@ public class MultithreadedTranslationTests {
     }
 
     final long translationEndTime = System.nanoTime();
-    final double pipelineLoadDurationInSeconds = (translationEndTime - translationStartTime) / ((double)NANO_SECONDS_PER_SECOND);
+    final double pipelineLoadDurationInSeconds = (translationEndTime - translationStartTime)
+            / ((double)NANO_SECONDS_PER_SECOND);
     System.err.println(String.format("%.2f seconds", pipelineLoadDurationInSeconds));
 
     // THEN
     assertTrue(translationResults.size() == inputLines);
   }
+
+  @Test(expectedExceptions = RuntimeException.class,
+          expectedExceptionsMessageRegExp = EXCEPTION_MESSAGE)
+  public void givenDecodeAllCalled_whenRuntimeExceptionThrown_thenPropagate() throws IOException {
+    // GIVEN
+    // A spy request stream that will cause an exception to be thrown on a threadpool thread
+    TranslationRequestStream spyReq = Mockito.spy(new TranslationRequestStream(null, joshuaConfig));
+    doReturn(createSentenceSpyWithRuntimeExceptions()).when(spyReq).next();
+
+    // WHEN
+    // Translate all segments in parallel.
+    Translations translations = this.decoder.decodeAll(spyReq);
+
+    ArrayList<Translation> translationResults = new ArrayList<>();
+    for (Translation t: translations)
+      translationResults.add(t);
+  }
+
+  private Sentence createSentenceSpyWithRuntimeExceptions() {
+    Sentence sent = new Sentence(INPUT, 0, joshuaConfig);
+    Sentence spy = Mockito.spy(sent);
+    Mockito.when(spy.target()).thenThrow(new RuntimeException(EXCEPTION_MESSAGE));
+    return spy;
+  }
 }