You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@joshua.apache.org by mj...@apache.org on 2016/08/29 17:42:42 UTC
[06/10] incubator-joshua git commit: JOSHUA-285 JOSHUA-296 Refactored
threading in order to properly propagate failures and remove custom code
JOSHUA-285 JOSHUA-296 Refactored threading in order to properly propagate failures and remove custom code
Project: http://git-wip-us.apache.org/repos/asf/incubator-joshua/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-joshua/commit/d1c9c074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-joshua/tree/d1c9c074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-joshua/diff/d1c9c074
Branch: refs/heads/JOSHUA-304
Commit: d1c9c074544d72ee5335bdd83fe415b45098ab08
Parents: 762d588
Author: Kellen Sunderland <ke...@amazon.com>
Authored: Mon Aug 29 10:08:10 2016 +0200
Committer: Kellen Sunderland <ke...@amazon.com>
Committed: Mon Aug 29 13:52:33 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/joshua/decoder/Decoder.java | 229 ++++---------------
.../apache/joshua/decoder/DecoderThread.java | 10 +-
.../joshua/decoder/JoshuaConfiguration.java | 3 +
.../org/apache/joshua/decoder/Translations.java | 18 ++
.../org/apache/joshua/server/ServerThread.java | 1 -
.../system/MultithreadedTranslationTests.java | 45 +++-
6 files changed, 111 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/Decoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/Decoder.java b/src/main/java/org/apache/joshua/decoder/Decoder.java
index 2d753c1..bc64fda 100644
--- a/src/main/java/org/apache/joshua/decoder/Decoder.java
+++ b/src/main/java/org/apache/joshua/decoder/Decoder.java
@@ -31,11 +31,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import com.google.common.base.Strings;
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.joshua.corpus.Vocabulary;
import org.apache.joshua.decoder.ff.FeatureVector;
import org.apache.joshua.decoder.ff.FeatureFunction;
@@ -83,6 +85,7 @@ import org.slf4j.LoggerFactory;
* @author Zhifei Li, zhifei.work@gmail.com
* @author wren ng thornton wren@users.sourceforge.net
* @author Lane Schwartz dowobeha@users.sourceforge.net
+ * @author Kellen Sunderland kellen.sunderland@gmail.com
*/
public class Decoder {
@@ -109,8 +112,6 @@ public class Decoder {
public static int VERBOSE = 1;
- private BlockingQueue<DecoderThread> threadPool = null;
-
// ===============================================================
// Constructors
// ===============================================================
@@ -147,7 +148,6 @@ public class Decoder {
private Decoder(JoshuaConfiguration joshuaConfiguration) {
this.joshuaConfiguration = joshuaConfiguration;
this.grammars = new ArrayList<>();
- this.threadPool = new ArrayBlockingQueue<>(this.joshuaConfiguration.num_parallel_decoders, true);
this.customPhraseTable = null;
resetGlobalState();
@@ -165,188 +165,70 @@ public class Decoder {
return new Decoder(joshuaConfiguration);
}
- // ===============================================================
- // Public Methods
- // ===============================================================
-
/**
- * This class is responsible for getting sentences from the TranslationRequest and procuring a
- * DecoderThreadRunner to translate it. Each call to decodeAll(TranslationRequest) launches a
- * thread that will read the request's sentences, obtain a DecoderThread to translate them, and
- * then place the Translation in the appropriate place.
- *
- * @author Matt Post <po...@cs.jhu.edu>
+ * This function is the main entry point into the decoder. It translates all the sentences in a
+ * (possibly boundless) set of input sentences. Each request launches its own thread to read the
+ * sentences of the request.
*
+ * @param request the populated {@link TranslationRequestStream}
+ * @throws RuntimeException if any fatal errors occur during translation
+ * @return an iterable, asynchronously-filled list of Translations
*/
- private class RequestParallelizer extends Thread {
- /* Source of sentences to translate. */
- private final TranslationRequestStream request;
+ public Translations decodeAll(TranslationRequestStream request) {
+ Translations results = new Translations(request);
+ CompletableFuture.runAsync(() -> decodeAllAsync(request, results));
+ return results;
+ }
- /* Where to put translated sentences. */
- private final Translations response;
+ private void decodeAllAsync(TranslationRequestStream request,
+ Translations responseStream) {
- RequestParallelizer(TranslationRequestStream request, Translations response) {
- this.request = request;
- this.response = response;
- }
-
- @Override
- public void run() {
- /*
- * Repeatedly get an input sentence, wait for a DecoderThread, and then start a new thread to
- * translate the sentence. We start a new thread (via DecoderRunnerThread) as opposed to
- * blocking, so that the RequestHandler can go on to the next sentence in this request, which
- * allows parallelization across the sentences of the request.
- */
- for (;;) {
+ // Give the threadpool a friendly name to help debuggers
+ final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("TranslationWorker-%d")
+ .setDaemon(true)
+ .build();
+ ExecutorService executor = Executors.newFixedThreadPool(this.joshuaConfiguration.num_parallel_decoders,
+ threadFactory);
+ try {
+ for (; ; ) {
Sentence sentence = request.next();
if (sentence == null) {
- response.finish();
break;
}
- // This will block until a DecoderThread becomes available.
- DecoderThread thread = Decoder.this.getThread();
- new DecoderThreadRunner(thread, sentence, response).start();
- }
- }
-
- /**
- * Strips the nonterminals from the lefthand side of the rule.
- *
- * @param rule
- * @return
- */
- private String formatRule(Rule rule) {
- String ruleString = "";
- boolean first = true;
- for (int word: rule.getFrench()) {
- if (!first)
- ruleString += " " + Vocabulary.word(word);
- first = false;
- }
-
- ruleString += " |||"; // space will get added with first English word
- first = true;
- for (int word: rule.getEnglish()) {
- if (!first)
- ruleString += " " + Vocabulary.word(word);
- first = false;
- }
-
- // strip of the leading space
- return ruleString.substring(1);
- }
- }
-
- /**
- * Retrieve a thread from the thread pool, blocking until one is available. The blocking occurs in
- * a fair fashion (i.e,. FIFO across requests).
- *
- * @return a thread that can be used for decoding.
- */
- public DecoderThread getThread() {
- try {
- return threadPool.take();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * This class handles running a DecoderThread (which takes care of the actual translation of an
- * input Sentence, returning a Translation object when its done). This is done in a thread so as
- * not to tie up the RequestHandler that launched it, freeing it to go on to the next sentence in
- * the TranslationRequest, in turn permitting parallelization across the sentences of a request.
- *
- * When the decoder thread is finshed, the Translation object is placed in the correct place in
- * the corresponding Translations object that was returned to the caller of
- * Decoder.decodeAll(TranslationRequest).
- *
- * @author Matt Post <po...@cs.jhu.edu>
- */
- private class DecoderThreadRunner extends Thread {
-
- private final DecoderThread decoderThread;
- private final Sentence sentence;
- private final Translations translations;
-
- DecoderThreadRunner(DecoderThread thread, Sentence sentence, Translations translations) {
- this.decoderThread = thread;
- this.sentence = sentence;
- this.translations = translations;
- }
-
- @Override
- public void run() {
- /*
- * Process any found metadata.
- */
-
- /*
- * Use the thread to translate the sentence. Then record the translation with the
- * corresponding Translations object, and return the thread to the pool.
- */
- try {
- Translation translation = decoderThread.translate(this.sentence);
- translations.record(translation);
-
- /*
- * This is crucial! It's what makes the thread available for the next sentence to be
- * translated.
- */
- threadPool.put(decoderThread);
- } catch (Exception e) {
- throw new RuntimeException(String.format(
- "Input %d: FATAL UNCAUGHT EXCEPTION: %s", sentence.id(), e.getMessage()), e);
- // translations.record(new Translation(sentence, null, featureFunctions, joshuaConfiguration));
+ executor.execute(() -> {
+ try {
+ Translation result = decode(sentence);
+ responseStream.record(result);
+ } catch (Throwable ex) {
+ responseStream.propagate(ex);
+ }
+ });
}
+ responseStream.finish();
+ } finally {
+ executor.shutdown();
}
}
- /**
- * This function is the main entry point into the decoder. It translates all the sentences in a
- * (possibly boundless) set of input sentences. Each request launches its own thread to read the
- * sentences of the request.
- *
- * @param request the populated {@link org.apache.joshua.decoder.io.TranslationRequestStream}
- * @throws IOException if there is an error with the input stream or writing the output
- * @return an iterable, asynchronously-filled list of Translations
- */
- public Translations decodeAll(TranslationRequestStream request) throws IOException {
- Translations translations = new Translations(request);
-
- /* Start a thread to handle requests on the input stream */
- new RequestParallelizer(request, translations).start();
-
- return translations;
- }
-
/**
- * We can also just decode a single sentence.
+ * We can also just decode a single sentence in the same thread.
*
* @param sentence {@link org.apache.joshua.lattice.Lattice} input
+ * @throws RuntimeException if any fatal errors occur during translation
* @return the sentence {@link org.apache.joshua.decoder.Translation}
*/
public Translation decode(Sentence sentence) {
- // Get a thread.
-
try {
- DecoderThread thread = threadPool.take();
- Translation translation = thread.translate(sentence);
- threadPool.put(thread);
-
- return translation;
-
- } catch (InterruptedException e) {
- e.printStackTrace();
+ DecoderThread decoderThread = new DecoderThread(this.grammars, Decoder.weights, this.featureFunctions, joshuaConfiguration);
+ return decoderThread.translate(sentence);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format(
+ "Input %d: FATAL UNCAUGHT EXCEPTION: %s", sentence.id(), e.getMessage()), e);
}
-
- return null;
}
/**
@@ -355,14 +237,6 @@ public class Decoder {
* afterwards gets a fresh start.
*/
public void cleanUp() {
- // shut down DecoderThreads
- for (DecoderThread thread : threadPool) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
resetGlobalState();
}
@@ -393,7 +267,7 @@ public class Decoder {
} else { // models: replace the weight
String[] fds = Regex.spaces.split(line);
- StringBuffer newSent = new StringBuffer();
+ StringBuilder newSent = new StringBuilder();
if (!Regex.floatingNumber.matches(fds[fds.length - 1])) {
throw new IllegalArgumentException("last field is not a number; the field is: "
+ fds[fds.length - 1]);
@@ -543,11 +417,8 @@ public class Decoder {
}
// Create the threads
- for (int i = 0; i < joshuaConfiguration.num_parallel_decoders; i++) {
- this.threadPool.put(new DecoderThread(this.grammars, Decoder.weights,
- this.featureFunctions, joshuaConfiguration));
- }
- } catch (IOException | InterruptedException e) {
+ //TODO: (kellens) see if we need to wait until initialized before decoding
+ } catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
@@ -579,7 +450,7 @@ public class Decoder {
int span_limit = Integer.parseInt(parsedArgs.get("maxspan"));
String path = parsedArgs.get("path");
- Grammar grammar = null;
+ Grammar grammar;
if (! type.equals("moses") && ! type.equals("phrase")) {
if (new File(path).isDirectory()) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/DecoderThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/DecoderThread.java b/src/main/java/org/apache/joshua/decoder/DecoderThread.java
index bdbdba0..a6f39b1 100644
--- a/src/main/java/org/apache/joshua/decoder/DecoderThread.java
+++ b/src/main/java/org/apache/joshua/decoder/DecoderThread.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
* @author Zhifei Li, zhifei.work@gmail.com
*/
-public class DecoderThread extends Thread {
+public class DecoderThread {
private static final Logger LOG = LoggerFactory.getLogger(DecoderThread.class);
private final JoshuaConfiguration joshuaConfiguration;
@@ -64,8 +64,9 @@ public class DecoderThread extends Thread {
// ===============================================================
// Constructor
// ===============================================================
+ //TODO: (kellens) why is weights unused?
public DecoderThread(List<Grammar> grammars, FeatureVector weights,
- List<FeatureFunction> featureFunctions, JoshuaConfiguration joshuaConfiguration) throws IOException {
+ List<FeatureFunction> featureFunctions, JoshuaConfiguration joshuaConfiguration) throws IOException {
this.joshuaConfiguration = joshuaConfiguration;
this.allGrammars = grammars;
@@ -84,11 +85,6 @@ public class DecoderThread extends Thread {
// Methods
// ===============================================================
- @Override
- public void run() {
- // Nothing to do but wait.
- }
-
/**
* Translate a sentence.
*
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java b/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
index e6f2955..6fd73ba 100644
--- a/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
+++ b/src/main/java/org/apache/joshua/decoder/JoshuaConfiguration.java
@@ -272,6 +272,9 @@ public class JoshuaConfiguration {
/* Weights overridden from the command line */
public String weight_overwrite = "";
+ /* Timeout in seconds for threads */
+ public long translation_thread_timeout = 30_000;
+
/**
* This method resets the state of JoshuaConfiguration back to the state after initialization.
* This is useful when for example making different calls to the decoder within the same java
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/decoder/Translations.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/decoder/Translations.java b/src/main/java/org/apache/joshua/decoder/Translations.java
index 1eb859a..b3f3633 100644
--- a/src/main/java/org/apache/joshua/decoder/Translations.java
+++ b/src/main/java/org/apache/joshua/decoder/Translations.java
@@ -20,6 +20,8 @@ package org.apache.joshua.decoder;
import java.util.Iterator;
import java.util.LinkedList;
+
+import com.google.common.base.Throwables;
import org.apache.joshua.decoder.io.TranslationRequestStream;
/**
@@ -50,6 +52,7 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
private boolean spent = false;
private Translation nextTranslation;
+ private Throwable fatalException;
public Translations(TranslationRequestStream request) {
this.request = request;
@@ -144,6 +147,8 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
}
}
+ fatalErrorCheck();
+
/* We now have the sentence and can return it. */
currentID++;
this.nextTranslation = translations.poll();
@@ -155,4 +160,17 @@ public class Translations implements Iterator<Translation>, Iterable<Translation
public Iterator<Translation> iterator() {
return this;
}
+
+ public void propagate(Throwable ex) {
+ synchronized (this) {
+ fatalException = ex;
+ notify();
+ }
+ }
+
+ private void fatalErrorCheck() {
+ if (fatalException != null) {
+ Throwables.propagate(fatalException);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/main/java/org/apache/joshua/server/ServerThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/joshua/server/ServerThread.java b/src/main/java/org/apache/joshua/server/ServerThread.java
index 72caa5f..32c7b91 100644
--- a/src/main/java/org/apache/joshua/server/ServerThread.java
+++ b/src/main/java/org/apache/joshua/server/ServerThread.java
@@ -35,7 +35,6 @@ import java.util.HashMap;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
-import org.apache.joshua.corpus.Vocabulary;
import org.apache.joshua.decoder.Decoder;
import org.apache.joshua.decoder.JoshuaConfiguration;
import org.apache.joshua.decoder.Translation;
http://git-wip-us.apache.org/repos/asf/incubator-joshua/blob/d1c9c074/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java b/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
index 7b1c47f..7a1d9f4 100644
--- a/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
+++ b/src/test/java/org/apache/joshua/system/MultithreadedTranslationTests.java
@@ -31,27 +31,31 @@ import org.apache.joshua.decoder.JoshuaConfiguration;
import org.apache.joshua.decoder.Translation;
import org.apache.joshua.decoder.Translations;
import org.apache.joshua.decoder.io.TranslationRequestStream;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.apache.joshua.decoder.segment_file.Sentence;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertTrue;
/**
* Integration test for multithreaded Joshua decoder tests. Grammar used is a
* toy packed grammar.
*
- * @author kellens
+ * @author Kellen Sunderland kellen.sunderland@gmail.com
*/
public class MultithreadedTranslationTests {
private JoshuaConfiguration joshuaConfig = null;
private Decoder decoder = null;
private static final String INPUT = "A K B1 U Z1 Z2 B2 C";
+ private static final String EXCEPTION_MESSAGE = "This exception should properly propagate";
private int previousLogLevel;
private final static long NANO_SECONDS_PER_SECOND = 1_000_000_000;
- @BeforeMethod
+ @BeforeClass
public void setUp() throws Exception {
joshuaConfig = new JoshuaConfiguration();
joshuaConfig.search_algorithm = "cky";
@@ -88,7 +92,7 @@ public class MultithreadedTranslationTests {
Decoder.VERBOSE = 0;
}
- @AfterMethod
+ @AfterClass
public void tearDown() throws Exception {
this.decoder.cleanUp();
this.decoder = null;
@@ -105,7 +109,7 @@ public class MultithreadedTranslationTests {
// should be sufficient to induce concurrent data access for many shared
// data structures.
- @Test
+ @Test()
public void givenPackedGrammar_whenNTranslationsCalledConcurrently_thenReturnNResults() throws IOException {
// GIVEN
@@ -125,7 +129,7 @@ public class MultithreadedTranslationTests {
ByteArrayOutputStream output = new ByteArrayOutputStream();
// WHEN
- // Translate all spans in parallel.
+ // Translate all segments in parallel.
Translations translations = this.decoder.decodeAll(req);
ArrayList<Translation> translationResults = new ArrayList<Translation>();
@@ -146,10 +150,35 @@ public class MultithreadedTranslationTests {
}
final long translationEndTime = System.nanoTime();
- final double pipelineLoadDurationInSeconds = (translationEndTime - translationStartTime) / ((double)NANO_SECONDS_PER_SECOND);
+ final double pipelineLoadDurationInSeconds = (translationEndTime - translationStartTime)
+ / ((double)NANO_SECONDS_PER_SECOND);
System.err.println(String.format("%.2f seconds", pipelineLoadDurationInSeconds));
// THEN
assertTrue(translationResults.size() == inputLines);
}
+
+ @Test(expectedExceptions = RuntimeException.class,
+ expectedExceptionsMessageRegExp = EXCEPTION_MESSAGE)
+ public void givenDecodeAllCalled_whenRuntimeExceptionThrown_thenPropagate() throws IOException {
+ // GIVEN
+ // A spy request stream that will cause an exception to be thrown on a threadpool thread
+ TranslationRequestStream spyReq = Mockito.spy(new TranslationRequestStream(null, joshuaConfig));
+ doReturn(createSentenceSpyWithRuntimeExceptions()).when(spyReq).next();
+
+ // WHEN
+ // Translate all segments in parallel.
+ Translations translations = this.decoder.decodeAll(spyReq);
+
+ ArrayList<Translation> translationResults = new ArrayList<>();
+ for (Translation t: translations)
+ translationResults.add(t);
+ }
+
+ private Sentence createSentenceSpyWithRuntimeExceptions() {
+ Sentence sent = new Sentence(INPUT, 0, joshuaConfig);
+ Sentence spy = Mockito.spy(sent);
+ Mockito.when(spy.target()).thenThrow(new RuntimeException(EXCEPTION_MESSAGE));
+ return spy;
+ }
}