You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2018/10/15 22:02:40 UTC
svn commit: r1843956 - in /uima/uimaj/trunk/uimaj-core/src:
main/java/org/apache/uima/analysis_engine/impl/
test/java/org/apache/uima/analysis_engine/impl/
Author: schor
Date: Mon Oct 15 22:02:39 2018
New Revision: 1843956
URL: http://svn.apache.org/viewvc?rev=1843956&view=rev
Log:
[UIMA-5893] experimental, disabled code
Added:
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/MultiThreadCoordination.java
Modified:
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/PrimitiveAnalysisEngine_impl.java
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/AnalysisEngine_implTest.java
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/PearAnalysisEngineWrapperTest.java
Added: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/MultiThreadCoordination.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/MultiThreadCoordination.java?rev=1843956&view=auto
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/MultiThreadCoordination.java (added)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/MultiThreadCoordination.java Mon Oct 15 22:02:39 2018
@@ -0,0 +1,1880 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.analysis_engine.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.WeakHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.uima.util.Misc;
+
+/**
+ *
+ * ************************************************************
+ * *
+ * * A N E X P E R I M E N T - not hooked up 10/2018
+ * *
+ * ************************************************************
+ * Supplies optional thread coordination when running identical pipelines in a multi-core CPU.
+ * The goal is to arrange to have threads running the same primitive(s) on separate threads
+ * at the same time, in hopes of improving locality of reference, and therefore l1/2/3 memory cache utilization
+ *
+ *
+ * Structure:
+ * 1 instance of this class per coordination group of pipelines
+ *
+ * inner classes:
+ * nbr_threads instances MultiThreadInfo, one per thread
+ * nbr_barrier instances of Barrier_info, one per barrier
+ *
+ * Inputs:
+ * list of coordination specs
+ * Where the barriers are:
+ * fully qualified AE chain, eg /key1/key2/
+ * where key is the key name in the aggregate descriptor
+ * 2 keys: start of high-priority section (the barrier) and
+ * end of high-priority section
+ * nbr of threads to run in parallel
+ * nbr of threads to wait at barrier
+ *
+ * computed: total nbr of threads: = nbr to run in parallel + extras when coordination holdup is happening
+ *
+ * Waiting:
+ * two kinds of waiting:
+ * - low priority "fill-in" threads that activate, but go back to sleep, as needed to
+ * compensate for threads being released at barrier
+ * - barrier hold - where threads wait for cohorts
+ * waiting queues:
+ * - one per barrier
+ * - one for low-priority
+ *
+ * change of state:
+ * - when computed, threads put into queues and marked shouldWait to go to wait state
+ * - when going to wait state, another thread is signaled
+ * -- released barrier first
+ *
+ * - when arrive at barrier, if thread marked to go to wait due to previous barrier being released,
+ * -- make equivalent to having marked a different thread to go to wait due barrier being released
+ * -- two cases:
+ * --- not the last to arrive - will wait this thread, and release a low-pri thread
+ * in this case, the equivalent will be to
+ * wait another thread
+ * wait this thread
+ * release another thread (cancel out the first wait-another-thread
+ * so - just wait this thread at barrier wait, resetting the pending wait
+ * --- is the last to arrive. will wakeup the barrier threads, and wait others
+ * in this case the equivalent will be to
+ * wait another thread
+ * release the barrier threads
+ * wait n-1 low-pri threads
+ * so - wait (n-1) + 1 low-pri threads.
+ *
+ * State maintained:
+ * map - all threads running pipelines to MultiThreadInfo for that thread
+ * map - delegate key names /xxx/yyy/zzz/ to barrier start, and barrier end
+ *
+ * Insertion points in the flow:
+ * - just before and after calling primitive process
+ * - at "before":
+ * - if at barrier, suspend thread until have enough
+ * - if not at barrier, but threadinfo marked as shouldWait, then wait this thread.
+ * - at "after":
+ * - if HIGH_PRIORITY and at barrier-end,
+ * -- lower priority
+ * - if ! HIGH_PRIORITY, another HIGH_PRIORITY item, suspend thread
+ *
+ * -- whenever a thread "waits", it first attempts to wakeup a compensating one
+ * -- high priority preferred
+ * -- order in the queue - from front
+ *
+ * - at beginning of pipeline
+ * -- maintains nbr_in_process - used to detect end-state
+ * -- incr and if tracing sets thread's seq nbr
+ * - at end of pipeline
+ * -- if HIGH_PRIORITY thread,
+ * reset to low priority
+ * -- decr nbr_in_process - used to detect end-state
+ *
+ * Running and Waiting threads go into multiple LinkedBlockingDeque, (set to never block)
+ * running: (low priority running, only) used to pick next thread to wait on when need to wait
+ * waiting: (barrier or low-priority) used to pick next thread to wakeup
+ *
+ * Initial state:
+ * set first baseNbr threads to running (put in low-pri running q, not marked shouldWait).
+ * set remainder threads to waiting (put in low-pri wait q, mark as shouldWait).
+ * end-state = false;
+ * nbr_in_process = 0;
+ * currently - a race condition: it must be true that
+ * enough threads start before one gets to end-of-pipeline
+ * to prevent premature triggering of end-state
+ * eventual fix: use timer, trigger end-state when thread
+ * returns and doesn't come back in xxx.
+ *
+ *
+ *
+ * Algorithm
+ * - goal: run n threads, preferably together following a barrier
+ * - recovery from exceptions: tbd
+ *
+ * simple case: only one barrier. For this,
+ * - keep state: NORMAL, END_STATE
+ * - use a number of threads size of (n-1) + n
+ * - pools:
+ * --- low_pri_wait
+ * - initial setup:
+ * -- all threads at low-pri
+ * -- n threads active, others in low_pri_wait, added to low_pri_wait pool
+ * -- set count_of_threads_running to n
+ * - when thread hits barrier:
+ * - add to barrier, switch thread to hi-pri
+ * - if thread will wait at barrier,
+ * - decr count_of_threads_running
+ * - while (nbr active threads < n) activate a lo-pri_wait thread from low-pri pool
+ * -- if no more, go to end state: in end state, open all barriers (use reset)
+ * --- implies setting up threads at first
+ * - if thread will not wait at barrier:
+ * -- reset (releases all threads at barrier)
+ * -- update count of hi-pri running
+ * -- pick n of the not released threads to switch to shouldWait state
+ * --- account for sync, for already in this state.
+ * - when thread hits non-barrier:
+ * if ( is in shouldWait state)
+ * -- add this thread to low-pri pool (waiting) and wait it
+ *
+ * - when thread finishes a barrier-ed primitive:
+ * -- drop its priority
+ *
+ * for m barriers:
+ * - use pool size of m * (n - 1) + n (m barrier waiting n-1 threads each)
+ *
+ */
+public class MultiThreadCoordination {
+
+// private static final String blanks = " ";
+ private static final String MULTI_THREAD_COORD_TRACE = "uima.multi_thread_coord_trace";
+ private static final boolean TRACE = Misc.getNoValueSystemProperty(MULTI_THREAD_COORD_TRACE);
+ private static final boolean ASSERTS = true;
+ private static final boolean USE_PRIORITY = false;
+ private static final AtomicInteger TRACE_ID = TRACE ? new AtomicInteger(0) : null;
+
+ public static final int HIGH_PRIORITY = Thread.currentThread().getPriority();
+ private static final int LOW_PRIORITY = Thread.currentThread().getPriority() - 1;
+ static { if (USE_PRIORITY && LOW_PRIORITY < Thread.MIN_PRIORITY) throw new IllegalStateException("thread priority invalid, Current: " + HIGH_PRIORITY); }
+ static { if (USE_PRIORITY && TRACE) System.out.format("TrMTC Priorites %d %d%n", HIGH_PRIORITY, LOW_PRIORITY); }
+
+ // maps from Threads to this instance
+ // WeakHashMap uses equals on Thread which is object ==
+ private static final Map<Thread, MultiThreadInfo> thread_to_multiThreadCoordination = Collections.synchronizedMap(new WeakHashMap<Thread, MultiThreadInfo>());
+
+ /**
+ * Thread states:
+ * in/out of pipeline K, O
+ * pending low-pri wait, low-pri wait, barrier wait P, W, B
+ * pending running (signaled), running/ high run S, R, H
+ * request work item Q
+ * no more work E
+ * initial_wait (startup) U
+ *
+ * Thread state transitions:
+ *
+ * Events:
+ * Startup
+ * Barrier-arrival
+ * Barrier-release
+ * Barrier-reset (when empty)
+ * end-of-high-pri-section
+ * end-of-pipeline (rotate)
+ *
+ * wait-points (always in-pipeline, somewhere):
+ * at start of process (arrive at barrier)
+ * at exit from process (switch to low pri, low-pri-suspend)
+ * at start-of-pipeline (for initial)
+ * at end-of-pipeline (for hi-pri-to-end, rotate)
+ *
+ */
+
+ /***************************************************
+ * Change thread state *
+ ***************************************************/
+
+ interface StateChange {
+ // wa, ra, ba wait,run,barrier add,
+ // wr, rr, br,wait,run,barrier remove
+
+ // run-pool: supplies candidates for pending wait.
+ // once in pending_wait, must be removed from run_pool
+
+ // wait_pool: supplies candidates for fill in, low pri
+ // barrier_pool: supplies candidates for fill in, high pri
+ // once signaled, remove from these pools
+
+ // start wait
+ void from_low_pri_pending_wait_to_wait___wa(); // stop fill-in thread at barrier release
+ // rotate at end, start of thread
+ // see low_pri_wait_to_ low_pri_run, barrier_wait_to_hi_pri_run
+
+ // end hi-pri section
+ void from_high_prty_run_to_low_prty_runf___ra(); // end-of-barrier, no more hi pri waiting
+ void from_high_prty_run_to_waitf___wa(); // end-of-barrier, more hi-pri waiting
+
+ // wake up waiting, hi pri first
+ // is part of Barrier_info
+// void from_barrier_wait_to_hi_prty_run___br(); // barrier release &&
+ // reset2lowPri || rotate || no-more-work sbstute
+
+ void from_low_prty_wait_to_low_prty_run___wr_ra(); // rotate at end,
+ // or fill in work
+ // or no-more-work substitute
+
+ // arrive at barrier, wait or go
+ void from_low_prty_run_to_barrier_waitb___rr_ba(); // barrier wait
+ void from_low_prty_run_to_high_prty_run___rr(); // barrier run
+
+ // release barrier - wait low-pri threads to give room for hi pri to run
+// void from_low_prty_run_to_pending_waitf___rr(); // barrier release
+ // is embedded in hold_n_low_pri
+
+ void from_low_prty_run_to_waitb___wa_rr(); // end of pipeline rotate
+
+ void from_low_prty_run_end___rr(); // no more work
+
+ void from_low_pri_pending_wait_to_barrier_wait___ba(); // low_pri_pending arrived at barrier
+ void from_low_pri_pending_wait_to_hi_prty_run(); // low_pri_pending arrived at barrier and tripped it
+ }
+
+
+ /**********************************************
+ * info per pipeline *
+ * pipelines are either *
+ * main pipeline or *
+ * Subpipelines (separate CASs *
+ * created by cas Mulitpliers or *
+ * by running AnnotatorGateways *
+ * *
+ * These are kept in a stack *
+ * via a parent link. *
+ * to allow determining when *
+ * the pipeline has ended *
+ **********************************************/
+ class Pipeline {
+ final MultiThreadInfo ti;
+ /**
+ * the unique id of this cas
+ */
+ int casId;
+ /**
+ * the number of resets in a particular cas
+ */
+ int casResets;
+
+ boolean isHiPriRun = false; // copy - to remember to be able to restore when returning
+
+ final Pipeline parent; // the cas which preceded this one and which might be returned to
+ Pipeline child = null;
+
+
+ Pipeline(MultiThreadInfo ti) {
+ this.ti = ti;
+ parent = null;
+ casId = -1;
+ casResets = -1;
+ }
+
+ Pipeline(MultiThreadInfo ti, Pipeline parent, int casId, int casResets) {
+ this.ti = ti;
+ this.parent = parent;
+ this.casId = casId;
+ this.casResets = casResets;
+ }
+
+ /**
+ *
+ * @param casId -
+ * @param casResets -
+ * @return either this CasInfo, if the same casId and resets, or
+ * a new CasInfo, if no match in previous, or
+ * the previous matched CasInfo if found
+ */
+ Pipeline maybe_do_pipeline_change(int casId, int casResets) {
+ int prev_casId = this.casId;
+ int prev_casResets = this.casResets;
+
+ if ( ! isSame(casId, casResets)) {
+ // two cases:
+ // this is a new instance (new invocation of annotator gateway, new casMultiplier start
+ // we finished some other "subroutine" and are returning to a previous cas
+ Pipeline localParent = parent;
+ while (localParent != null) {
+ if (localParent.isSame(casId, casResets)) {
+ if (TRACE) System.out.println(sb.append(", returning to prev cas/rsts from ").append(prev_casId).append('/').append(prev_casResets));
+ return localParent;
+ }
+ localParent = localParent.parent;
+ }
+
+ // if get here, no match
+ // two cases: same cas, but reset, or different cas
+ // if same cas, don't make a new one, treat as ending previous one
+ if (casId == this.casId) {
+ this.casResets = casResets;
+ if (TRACE) System.out.println(sb.append(", new iter cas/rsts from ").append(prev_casId).append('/').append(prev_casResets));
+ return this;
+ }
+
+ if (TRACE) System.out.println(sb.append(", new subr cas/rsts from ").append(prev_casId).append('/').append(prev_casResets));
+ return new Pipeline(this.ti, this, casId, casResets);
+ } else {
+// if (TRACE) System.out.println(sb.append(", same cas/rsts"));
+ return this;
+ }
+ }
+
+ boolean isSame(int casId, int casResets) {
+ return this.casId == casId && this.casResets == casResets;
+ }
+
+ }
+
+
+ /**********************************************
+ * info per thread *
+ **********************************************/
+ public class MultiThreadInfo implements StateChange {
+ final Thread thread;
+ final int t_number; // 0 to n
+ int seq = -1; // an incrementing sequence number, maybe corresponding to work item number.
+ // set at pipeline enter, -1 is value if not set
+ final Condition condition;
+ final MultiThreadCoordination mtc;
+
+ Pipeline pipeline; // not final, changes when casResets changes, or cas changes
+
+ boolean isWithinPipeline = false;
+ boolean hasNoWork = false;
+ TimerTask newWorkTimer = null;
+
+ boolean initialWait = false; // startup - set to true for waiting threads
+ boolean initial = true; // startup logic
+ boolean isKeepWaiting = false;
+ boolean isHiPriRun = false;
+ boolean isLowPriRun = false;
+ boolean isPendingLowPriWait = false;
+ boolean isPendingLowPriWait_inFront; // if should go in at front of q
+// boolean isPendingBarrierWait = false;
+ boolean isInProcess = false; // needed because the end is called an extra time
+ BarrierInfo currentBarrier = null; // set to the barrier when running at hi priority
+
+ boolean terminate = false; // set to true to force termination
+
+
+ MultiThreadInfo(Thread t, int t_number, MultiThreadCoordination mtc) {
+ this.thread = t;
+ this.t_number = t_number;
+ this.mtc = mtc;
+ this.condition = mtc.instance_lock.newCondition();
+ this.pipeline = mtc.new Pipeline(this);
+ if (TRACE) System.out.format("TrMTC new Thread %s%n", t.getName());
+ }
+
+ void handle_possible_cas_change(int casId, int casResets) {
+
+ /****************
+ * initial case *
+ ****************/
+ if (pipeline.casId == -1) {
+ // initialize
+ pipeline.casId = casId;
+ pipeline.casResets = casResets;
+ if (TRACE) System.out.println(sb.append(", init for id/rst:").append(casId).append('/').append(casResets));
+ return;
+ }
+
+ int prev_cas_resets = pipeline.casResets;
+ Pipeline cc = pipeline.maybe_do_pipeline_change(casId, casResets);
+ if (cc == pipeline) {
+ if (prev_cas_resets == pipeline.casResets) {
+ /****************
+ * no change *
+ ****************/
+ return;
+ }
+
+ /*******************
+ * same cas, reset *
+ *******************/
+// if (pipeline.isHiPriRun) {
+// reset_to_low_priority(); // could release other hi-pri thread if exists
+// }
+// start_of_inner_pipeline(pipeline);
+ return;
+ }
+
+ /*********************
+ * new subr pipeline *
+ *********************/
+ if (cc.parent == pipeline) {
+ // added a new pipeline running a fresh CAS
+ if (pipeline.isHiPriRun) {
+ cc.isHiPriRun = true; // init subr to same pri state
+ }
+ pipeline.child = cc;
+ pipeline = cc;
+
+
+// start_of_inner_pipeline(pipeline); // run hi-pri subr at hi-pri
+ return;
+ }
+
+ /*********************
+ * return from subr *
+ *********************/
+ // end of previous pipeline
+ // returning to previous cas already in progress
+ cc.child = null;
+
+ if (pipeline.isHiPriRun && ! cc.isHiPriRun) {
+ pipeline = cc; // so change affects this
+ reset_to_low_priority(); // could release other hi-pri thread if exists
+
+ } else if ( ! pipeline.isHiPriRun && cc.isHiPriRun) {
+ pipeline = cc; // so change affects this
+ from_low_prty_run_to_high_prty_run___rr();
+ } else {
+ pipeline = cc;
+ }
+ }
+
+ void set_thread_state(char c) {
+ if (TRACE) thread_state.setCharAt(t_number, c);
+ }
+
+
+ void setPendingLowPriWait(boolean isFirst) {
+ isPendingLowPriWait = true;
+ if (TRACE) {
+ set_thread_state('P');
+ }
+ isPendingLowPriWait_inFront = isFirst;
+ }
+
+ void resetPendingLowPriWait() {
+ if ( ! isPendingLowPriWait) {
+ throw new RuntimeException(sb.append(" resetting Pending low pri wait, but wasn't set").toString());
+ }
+ isPendingLowPriWait = false;
+ }
+
+ void setLowPriWait() {
+ resetPendingLowPriWait();
+ isLowPriRun = false;
+ if (TRACE) {
+ System.out.println(sb.append(" set_low_pri_wait +w(t#").append(t_number).append(")"));
+ set_thread_state('W');
+ thread_state_in_out.setCharAt(t_number, 'K');
+ }
+ }
+
+ void setBarrierWait(int nbrWaiting) {
+ isLowPriRun = false;
+// isPendingBarrierWait = true;
+ if (TRACE) {
+ System.out.println(sb.append(" +hold, nbr_waiting: ").append(nbrWaiting));
+ set_thread_state('B');
+ }
+ }
+
+ void setLowPriRun() {
+ isLowPriRun = true;
+ isHiPriRun = false;
+ pipeline.isHiPriRun = false;
+ if (TRACE) {
+ set_thread_state('R');
+ System.out.println(sb.append(" Run low-pri"));
+ }
+ }
+
+ void setHiPriRun(boolean isBarrierWait) {
+ if (USE_PRIORITY) thread.setPriority(HIGH_PRIORITY);
+ if (USE_PRIORITY && ASSERTS) {
+ if (Thread.currentThread().getPriority() != HIGH_PRIORITY) {
+ throw new RuntimeException(sb.append("failed to set high priority").toString());
+ }
+ }
+ isHiPriRun = true;
+ pipeline.isHiPriRun = true; // remember in pipeline
+ if (TRACE) {
+ System.out.println(sb.append(" hi pri (t#").append(t_number).append(")"));
+ set_thread_state(isBarrierWait ? 'B' : 'H');
+ show_thread_state();
+ }
+ }
+
+ private boolean wake_up_another_thread_if_not_initial() {
+ // find another thread to wake up
+ boolean wokeUp;
+ if (initialWait) {
+ initialWait = false; // reset the one time startup condition
+ wokeUp = true; // pretend to wakeup if initial wait, without waking anything up.
+ } else {
+ wokeUp = wakeup_hi_or_low();
+ }
+ if (TRACE) show_thread_state();
+ return wokeUp;
+ }
+
+
+ private void add_to_wait_pool() {
+ if (isPendingLowPriWait_inFront) {
+ addFirst(wait_pool);
+ } else {
+ addLast(wait_pool);
+ }
+ }
+
+ private void addFirst(LinkedBlockingDeque<MultiThreadInfo> pool) {
+ if (ASSERTS) {
+ if (pool.contains(this)) throw new RuntimeException("ERROR 2x add t#" + t_number);
+ if (pool == run_pool && isPendingLowPriWait) {
+ throw new RuntimeException(sb + "\nERROR inserting pending wait into run pool");
+ }
+ }
+ if (TRACE) sb.append((pool == run_pool) ? " +rf" : " +wf");
+ pool.addFirst(this);
+ }
+
+ private void addLast(LinkedBlockingDeque<MultiThreadInfo> pool) {
+ if (ASSERTS) {
+ if (pool.contains(this)) throw new RuntimeException(sb + "\nERROR 2x add t#" + t_number);
+ if (pool == run_pool && isPendingLowPriWait) {
+ throw new RuntimeException(sb + "\nERROR inserting pending wait into run pool");
+ }
+ }
+ if (TRACE) sb.append((pool == run_pool) ? " +rl" : " +wl");
+ pool.addLast(this);
+ }
+
+ private void remove(LinkedBlockingDeque<MultiThreadInfo> pool) {
+ boolean wasRemoved = pool.remove(this);
+ if (! wasRemoved) {
+ throw new RuntimeException(sb.toString() + "\nnever happen: removing from pool, but wasn't there");
+ }
+ }
+
+ /**
+ * wake up a barrier wait or low pri wait thread
+ * @param to_wakeup
+ */
+ private void wakeup() {
+ isKeepWaiting = false; // so it will wake up
+ condition.signal(); // only one is ever waiting
+ if (TRACE) {
+ System.out.println(sb.append(" Notified: t#").append(t_number));
+ }
+ }
+
+
+ /**
+ *
+ * @return true if skipped wait
+ */
+ private boolean do_wait() {
+ if (TRACE) System.out.println(sb.append(" starting to wait"));
+
+ if ( ! isEndState) {
+ long startSleep = TRACE ? System.nanoTime() : 0;
+ isKeepWaiting = true;
+ while (isKeepWaiting && ! terminate) {
+ try {
+ condition.await(); // need separate object per thread to wait on
+ } catch (InterruptedException e) {
+ if (TRACE) System.out.println("MultiThreadCoordination singleThread wait got interrupt");
+ }
+ }
+
+ if (terminate) throw new RuntimeException("forced termination due to jvm shutdown");
+ startTrace(this, " after wakeup");
+ if (TRACE) System.out.println(sb.append(String.format(" in %,.4f ms", (System.nanoTime() - startSleep) / 1000000.0)));
+ return false;
+ } else {
+ if (TRACE) System.out.println(sb.append(" skip wait, end-state"));
+ return true;
+ }
+ }
+
+ private void set_low_priority() {
+ if (USE_PRIORITY) {
+ if (thread.getPriority() == HIGH_PRIORITY) {
+ thread.setPriority(LOW_PRIORITY);
+ } else {
+ throw new RuntimeException(sb.append(", setting low priority, but wasn't high").toString());
+ }
+ }
+ currentBarrier = null;
+ }
+
+ /**
+ * reset high pri thread back to low pri
+ * adds to run pool, maybe after waiting
+ * @param ti
+ * @return the barrier info
+ */
+ BarrierInfo reset_to_low_priority() {
+ if (TRACE) {
+ System.out.println(sb.append(", loweredPriority"));
+ }
+ BarrierInfo bi = currentBarrier;
+ if (bi.released) { // if released, then there's still more hi pri work to do
+ from_high_prty_run_to_waitf___wa();
+ } else {
+ from_high_prty_run_to_low_prty_runf___ra();
+ }
+
+ return bi;
+ }
+
+ /*==================== start of interface impls ========================*/
+
+ @Override
+ public void from_low_pri_pending_wait_to_wait___wa() {
+ setLowPriWait();
+ boolean okToWait = wake_up_another_thread_if_not_initial(); // true if woke up another or initial
+ add_to_wait_pool();
+ boolean skipped_wait = ! okToWait;
+ if (okToWait) {
+ skipped_wait = do_wait();
+ }
+ if (skipped_wait) { // true if not ok to wait or wait skipped because at end state
+ // switch this thread back to low pri run
+ // these actions normally done by other thread waking this up
+ setLowPriRun(); // sets booleans only
+ addFirst(run_pool);
+ remove(wait_pool); // because no notify/signal removed it
+ }
+ }
+
+ @Override
+ public void from_low_prty_run_to_barrier_waitb___rr_ba() {
+ if (TRACE) sb.append("(wait)");
+ setHiPriRun(true); // true = is barrier wait
+ common_to_barrier_waitb(true); // true means remove from run pool
+ }
+
+ private void common_to_barrier_waitb(boolean remove_from_run_pool) {
+ setBarrierWait(currentBarrier.barrier_wait_pool.size()); // for msg number waiting
+ // remove must happen before wakeup - a low pri wakeup gets added to run pool, which might otherwise be full
+ if (remove_from_run_pool) remove(run_pool); // remove skipped if pendingWait - already removed
+ boolean okToWait = wakeup_hi_or_low();
+ addLast(currentBarrier.barrier_wait_pool);
+ boolean skipped_wait = ! okToWait;
+ if (okToWait) {
+ skipped_wait = do_wait();
+ }
+
+ if (TRACE) set_thread_state('H');
+
+ if (skipped_wait) {
+ // these actions normally done by other thread waking this one up
+ currentBarrier.barrier_wait_pool.removeLast(); // undo the addLast above
+ currentBarrier.maybeResetBarrier();
+ }
+ }
+
+ @Override
+ public void from_low_pri_pending_wait_to_barrier_wait___ba() {
+ if (TRACE) System.out.println(sb.append(", converting low pri wait to barrier wait"));
+ resetPendingLowPriWait(); // just resets boolean
+ setHiPriRun(true); // true = is barrier wait
+ common_to_barrier_waitb(false); // false means no remove from run pool
+ }
+
+ @Override
+ public void from_high_prty_run_to_low_prty_runf___ra() {
+ set_low_priority(); // changes only the thread priority
+ setLowPriRun();
+ addFirst(run_pool);
+ }
+
+ @Override
+ public void from_high_prty_run_to_waitf___wa() {
+ set_low_priority();
+ isPendingLowPriWait_inFront = true;
+ isPendingLowPriWait = true; // just to avoid exception - next call checks
+ from_low_pri_pending_wait_to_wait___wa(); // same logic - put into wait state, waking up another
+ }
+
+ @Override
+ public void from_low_prty_wait_to_low_prty_run___wr_ra() {
+ wakeup(); // only does signalling
+ setLowPriRun();
+ addFirst(run_pool);
+ }
+
+ @Override
+ public void from_low_prty_run_to_high_prty_run___rr() {
+ setHiPriRun(false); // false is high-pri run, not barrier wait
+ remove(run_pool);
+ }
+
+// @Override
+// public void from_low_prty_run_to_pending_waitf___rr() {
+// if (run_pool.isEmpty()) {
+// throw new RuntimeException("never happen");
+// }
+// MultiThreadInfo ti = run_pool.removeLast();
+// if (TRACE) {
+// System.out.println(sb.append(", hold_low_pri removing t#").append(ti.t_number).append(" from run_pool"));
+// }
+// if (ti.isPendingLowPriWait) {
+// throw new RuntimeException("debug ERROR setting pending wait - found a pending wait in run_pool");
+// }
+// ti.setPendingLowPriWait(true);
+// }
+
+ @Override
+ public void from_low_prty_run_to_waitb___wa_rr() {
+ if (TRACE) System.out.println(sb.append(", removing from run_pool t#").append(t_number));
+ remove(run_pool);
+ isPendingLowPriWait_inFront = false;
+ isPendingLowPriWait = true; // next call checks this
+ from_low_pri_pending_wait_to_wait___wa();
+
+ }
+
+ @Override
+ public void from_low_pri_pending_wait_to_hi_prty_run() {
+ // not (yet) in wait pool, no need to remove
+ // not in run pool either.
+ setHiPriRun(false); // false is high-pri run, not barrier wait
+ }
+
+ @Override
+ public void from_low_prty_run_end___rr() {
+ if (newWorkTimer == null) {
+ return; // was cancelled
+ }
+ isLowPriRun = false;
+ hasNoWork = true;
+ if (TRACE) set_thread_state('E'); // end state
+
+
+
+ if ( ! isPendingLowPriWait) remove(run_pool); // otherwise, already removed from run pool
+ if (run_pool.remainingCapacity() == 0) {
+ if (TRACE) System.out.println(sb.append(", run_pool full, this thread in pending wait"));
+ return;
+ }
+// boolean wasIsEndState = isEndState;
+ boolean noMore = ! wakeup_hi_or_low(); // continue if can wake up some other thread
+ if (TRACE) {
+ System.out.println(sb.append(", time-out-waiting-for-wk, ").append(noMore ? "no more waiting" : "woke-up another"));
+// .append(", prev. endstate: ").append(wasIsEndState));
+ show_thread_state();
+ }
+
+ if (noMore) {
+ wakeup_1_at_barrier();
+ }
+ }
+
+ }
+
+ /**********************************************
+ * info per barrier *
+ **********************************************/
+ private class BarrierInfo {
+
+ final LinkedBlockingDeque<MultiThreadInfo> barrier_wait_pool;
+ /**
+ * set to true when released, set to false when
+ * barrier_wait_pool reaches 0 and ! end-state
+ */
+ boolean released = false;
+ boolean keep_released = false; // set to true at end-state
+ final String start_key;
+// final String end_key;
+
+ BarrierInfo(String start_key, String end_key, int barrierNbrOfThreads) {
+ this.start_key = start_key;
+// this.end_key = end_key;
+ barrier_wait_pool = new LinkedBlockingDeque<MultiThreadInfo>(barrierNbrOfThreads - 1);
+ }
+
+ public void from_barrier_wait_to_hi_prty_run___br() {
+ MultiThreadInfo to_wakeup = barrier_wait_pool.removeFirst();
+ to_wakeup.wakeup();
+ maybeResetBarrier();
+ }
+
+ void maybeResetBarrier() {
+ if (barrier_wait_pool.isEmpty() && ! keep_released) {
+ released = false; // reset the barrier
+ if (TRACE) sb.append(" ***barrier_reset***");
+ }
+ }
+
+ }
+
+ /**********************************************
+ * Debug run_pool *
+ **********************************************/
+ private static class DebugDeque extends LinkedBlockingDeque<MultiThreadInfo> {
+ private static final long serialVersionUID = 1L;
+ final String name;
+
+ DebugDeque(String name, int n) {
+ super(n);
+ this.name = name;
+ }
+
+ String x(MultiThreadInfo e, String ... sa ) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TRACE_ID.getAndIncrement()).append(" TrMTC t#").append(e.t_number);
+ time_seq(sb, e);
+ sb.append(" DebugDeque[").append(name).append("](").append(Integer.toString(size()))
+ .append(") ");
+ for (String s : sa) {
+ sb.append(s);
+ }
+ if (e != null) {
+ sb.append(" ddt#").append(Integer.toString(e.t_number));
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void addFirst(MultiThreadInfo e) {
+ System.out.println(x(e, "adding First"));
+ super.addFirst(e);
+ }
+
+ @Override
+ public void addLast(MultiThreadInfo e) {
+ System.out.println(x(e, "adding Last"));
+ super.addLast(e);
+ }
+
+ @Override
+ public MultiThreadInfo removeFirst() {
+ MultiThreadInfo e = super.removeFirst();
+ System.out.println(x(e, "removing First"));
+ return e;
+ }
+
+ @Override
+ public MultiThreadInfo removeLast() {
+ MultiThreadInfo e = super.removeLast();
+ System.out.println(x(e, "removing Last"));
+ return e;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ System.out.println(x((MultiThreadInfo)o, "removing"));
+ return super.remove(o);
+ }
+ }
+
+ // info for the coordination group
+
+ /**
+ * The barriers in this coordination group.
+ * hashmap for quick contains test.
+ * no sync needed because all the puts are in one thread, which occurs in the constructor, before all the gets.
+ */
+ final private Map<String, BarrierInfo> barrier_starts = new HashMap<>(); // a hashmap for quick testing of contains
+ final private Map<String, BarrierInfo> barrier_ends = new HashMap<>(); // a hashmap for quick testing of contains
+
+ /**
+ * should be equal to the number of CPUs perhaps without hyperthreading (depending on mem cache design and size)
+ * Is adjusted down at end of run, to compensate for reduced parallelism
+ */
+
+ final private int baseNbrOfThreads;
+ final private int barrierNbrOfThreads; // number to release at once from a barrier
+
+ final private AtomicInteger created_thread_number = new AtomicInteger(0); // numbers the threads
+ final private AtomicInteger nbr_started_threads = new AtomicInteger(0);
+
+ /********************************************************************************************
+ * T H R E A D S T A T E S *
+ ********************************************************************************************
+ * thread is either in UIMA process call or outside of it
+ * - while in process call it may wait, either low-pri wait or at barrier wait
+ * - count nbr of threads in UIMA process call
+ * -- incr at process start, decr at process exit
+ * - if drops below baseNbr (and stays below for timeout = 1 sec (?)) set end state
+ * END STATE:
+ * skips future hold at barrier
+ * skips future hold of low pri when releasing at barrier
+ * skip future wait if thread was marked to wait, and arrives at start of process
+ * release all barriers at end-of-pipeline if wait pool empty or becomes empty
+ *
+ ********************************************************************************************/
+
+ /********************************************************************************************
+ * W A I T and R U N P O O L S ( D E Q U E S ) *
+ ********************************************************************************************
+ * wait_pool: has threads which are eligible to wake up.
+ * Wakeup is ordered, front of deque first.
+ * - add to end when
+ * -- at creation, for threads above threshold
+ * -- rotating out a thread at end of pipeline
+ * - add to beginning when
+ * -- looping to hold n threads after releasing barrier
+ * -- substituting for a should-wait barrier-wait thread which isn't going to wait
+ * - remove from beginning when
+ * -- to compensate for hold at barrier
+ * -- when rotating
+ *
+ * run_pool: has threads which are eligible to suspend
+ * Includes only low-pri threads; high priority ones not put in this pool.
+ * picking for waiting is ordered, front of deque first.
+ * - add to end when
+ * -- at creation for threads below threshold
+ * -- notified after take from wait_pool to compensate for hold at barrier
+ * -- switching from high to low priority at end of process call
+ * (allows earlier threads to run preferentially)
+ * - remove when
+ * -- looping to hold n threads after releasing barrier (removeLast)
+ * -- rotating out a thread at end of pipeline (remove)
+ * -- holding to coordinate (remove)
+ *********************************************************************************************/
+
+ final private LinkedBlockingDeque<MultiThreadInfo> wait_pool;
+ final private LinkedBlockingDeque<MultiThreadInfo> run_pool; // low pri
+ // a third queue is kept in the barrier, the items waiting at the barrier
+ // a 4th count (not queue) is the high-priority running items
+
+// /**
+// * When blocking at a barrier, other compensating low pri threads are woken up.
+// * If run out of available waiting low pri threads, add to pending count.
+// * This can happen if a high pri thread is still running from a barrier.
+// * When switching a hi-pri to low-pri, if pending, set it to wait and decr pending count.
+// */
+// private int pending_waits = 0;
+
+ /**
+ * nbr_in_process, used to signal end-state
+ * incr at pipeline start, decr at pipeline end
+ */
+ private int nbr_in_process = 0;
+ private boolean isEndState = false;
+
+ private final StringBuilder sb = new StringBuilder();
+ private final StringBuilder thread_state_in_out = TRACE ? new StringBuilder() : null;
+ private final StringBuilder thread_state = TRACE ? new StringBuilder() : null;
+ private final StringBuilder thread_state_key = TRACE ? new StringBuilder() : null;
+ private final ArrayList<MultiThreadInfo> multi_thread_infos = new ArrayList<>();
+ private final AtomicInteger seq = new AtomicInteger(-1); // used in msgs
+// private final long start_time = System.nanoTime(); // used in msgs
+
+ /************************************************************************
+ * L O C K I N G *
+ ************************************************************************
+ * there is one lock, per instance of this class *
+ * - instance_lock *
+ * threadInfo instances have individual Condition instances, *
+ * linked to this one lock, *
+ * used for both low-pri waiting and barrier waiting *
+ * Having all of these conditions share one lock simplifies the *
+ * mutual exclusion coordination - no deadlock issues *
+ ************************************************************************/
+ private final ReentrantLock instance_lock = new ReentrantLock(); // must be in initializer
+
+ private final Timer no_more_work_timer = new Timer("no_more_work_timer");
+ private int debug_dump1 = 5;
+
+
+ /********************************************************************************************
+ * C O N S T R U C T O R *
+ ********************************************************************************************
+ * called once per coordination group, when setting it up *
+ * *
+ * @param barrier_ids list of strings of /aaa/bbb/ *
+ * of fully-qualified key names in aggregate chain down to the primitive *
+ * but excluding the primitive name. *
+ * - 2 strings per barrier - the starting key and the ending key *
+ * - ending key can be a non-matching value (e.g. "end") *
+ * which means the barrier section extends thru to the end of the *
+ * pipeline
+ * @param baseNbrOfThreads number of threads to keep active, *
+ * set to manage memory bandwidth, l1/2/3 caching *
+ * @param barrierNbrOfThreads number of threads to await at a barrier before releasing *
+ * to get an affinity effect, this number should be larger than the *
+ * baseNbrOfThreads *
+ * @param timeout -
+ * @param tu -
+ ********************************************************************************************/
+
+ public MultiThreadCoordination(List<String> barrier_ids,
+ int baseNbrOfThreads,
+ int barrierNbrOfThreads,
+ // next 2 are ignored
+ int timeout,
+ TimeUnit tu) {
+
+ this.baseNbrOfThreads = baseNbrOfThreads;
+ this.barrierNbrOfThreads = barrierNbrOfThreads;
+
+ for (int i = 0; i < barrier_ids.size(); i++) {
+ String start_key = barrier_ids.get(i++);
+ String end_key = barrier_ids.get(i);
+
+ BarrierInfo b = new BarrierInfo(start_key, end_key, barrierNbrOfThreads);
+ barrier_starts.put(start_key, b);
+ barrier_ends.put(end_key, b);
+ if (TRACE) {
+ System.out.format("TrMTC setup adding barrier for start_key: \"%s\", end_key: \"%s\" "
+ + "with %d base threads, %d barrier count%n",
+ start_key, end_key, baseNbrOfThreads, barrierNbrOfThreads);
+ }
+ }
+ // max in wait pool = total threads because more could be pending waiting...
+ wait_pool = TRACE ? new DebugDeque("wait", barrierNbrOfThreads + baseNbrOfThreads)
+ : new LinkedBlockingDeque<MultiThreadInfo>(barrierNbrOfThreads + baseNbrOfThreads);
+ run_pool = TRACE ? new DebugDeque("run", baseNbrOfThreads)
+ : new LinkedBlockingDeque<MultiThreadInfo>(baseNbrOfThreads);
+
+ if (TRACE) {
+ System.out.println("ThreadState dump codes:\n"
+ + " R - low-pri-run\n"
+ + " W - low-pri-wait\n"
+ + " P - pending-low-pri-wait\n"
+ + " B - barrier-wait\n"
+ + " H - hi-pri-run\n"
+ + " Q - request work item\n"
+ + " E = no-more-work\n"
+ + " U = initial_wait (startup)"
+ + " K - in pipeline\n"
+ + " O - out-of-pipeline");
+ System.out.format("TrMTC setup finished, baseNbr: %d%n", baseNbrOfThreads);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(null, new Runnable() {
+ @Override
+ public void run() {
+ instance_lock.lock();
+ try {
+ for (MultiThreadInfo ti : multi_thread_infos) {
+ ti.terminate = true;
+ ti.condition.signal();
+ }
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+ }, "stop threads"));
+
+ }
+
+ /*******************************************************************************************
+ * S T A T I C M E T H O D S INTERFACE WITH PIPELINE EVENTS *
+ * *
+ * Use static weakMap with thread as key to get threadInfo *
+ * - if exists, call instance of this class's equivalent method *
+ *******************************************************************************************/
+
+ public static void start_of_pipeline() {
+ MultiThreadInfo ti = thread_to_multiThreadCoordination.get(Thread.currentThread());
+ if (ti == null) return;
+
+ ti.mtc.start_of_pipeline(ti);
+ }
+
+ /*
+ * At end of pipeline
+ */
+ public static void end_of_pipeline() {
+ MultiThreadInfo ti = thread_to_multiThreadCoordination.get(Thread.currentThread());
+ if (ti == null) return;
+
+ ti.mtc.end_of_pipeline_rotate_thread(ti);
+ }
+
+// /*
+// * At end of thread
+// */
+// public static void end_of_thread() {
+// MultiThreadInfo ti = thread_to_multiThreadCoordination.get(Thread.currentThread());
+// if (ti == null) return;
+//
+// ti.mtc.end_of_thread(ti);
+// }
+
+ /*
+ * About to call primitive process
+ * Do nothing if this thread not being coordinated,
+ * else call the mtc
+ * @param id
+ */
+ static MultiThreadInfo at_call_primitive(String id, int casId, int casResets) {
+ MultiThreadInfo ti = thread_to_multiThreadCoordination.get(Thread.currentThread());
+ if (ti == null) return null;
+
+ ti.mtc.at_call_primitive(ti, id, casId, casResets);
+ return ti;
+ }
+
+ void at_call_primitive_exit(MultiThreadInfo ti, String key) {
+ acquireLock_startTrace(ti, " prim_exit ", annot_short_name(key));
+ try {
+ BarrierInfo bi = barrier_ends.get(key);
+ if (bi != null) {
+ ti.reset_to_low_priority(); // maybe waits
+ }
+
+ if (ti.isPendingLowPriWait) {
+ ti.isPendingLowPriWait_inFront = true;
+ ti.from_low_pri_pending_wait_to_wait___wa();
+ }
+
+ } catch (Throwable e) {
+ System.out.println(sb.append(" Throwable caught"));
+ e.printStackTrace();
+ throw e;
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+
+// private void wait_low_pri(MultiThreadInfo ti) {
+// ti.setLowPriWait();
+// boolean waited = wait_this_thread(ti);
+// if ( ! waited) {
+// remove(wait_pool, ti);
+// set_low_pri_run(ti, false); // puts back in run pool
+// } else {
+// ti.setLowPriRun(); // doesn't update run pool, notifier did that when notifying this thread
+// }
+// }
+
+ /******************************************************************
+ * at call primitive:
+ * if a barrier, hold up until enough cohorts, then release all
+ * if not at barrier, wait the thread if shouldWait (pending)
+ * @param ti -
+ * @param barrier_id -
+ * @param casId -
+ * @param casResets -
+ *******************************************************************/
+ public void at_call_primitive(MultiThreadInfo ti, String barrier_id, int casId, int casResets) {
+ acquireLock_startTrace(ti, " prim_start ", annot_short_name(barrier_id),
+ "; id/rst:", Integer.toString(casId), "/", Integer.toString(casResets));
+ try {
+
+ if (ti.isPendingLowPriWait) {
+ ti.from_low_pri_pending_wait_to_wait___wa();
+ }
+
+ ti.handle_possible_cas_change(casId, casResets); // may wait if at pipeline end and other hi pri
+
+ BarrierInfo bi = barrier_starts.get(barrier_id);
+ if (bi == null) {
+
+ /********************************
+ * NOT at a barrier *
+ ********************************/
+ if (ti.isPendingLowPriWait) {
+ // happens when this thread was waited above, then woken-up (but never started
+ // before another thread reached a barrier and tripped it, causing this thread to now get
+ // marked pending wait
+ ti.from_low_pri_pending_wait_to_wait___wa();
+ }
+ return;
+ }
+
+
+ /********************************
+ * arrived at a barrier *
+ ********************************/
+ arrive_at_barrier_wait_or_release(ti, bi);
+
+ } catch (Throwable e) {
+ System.out.println(sb.append(" Throwable caught"));
+ e.printStackTrace();
+ throw e;
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+
+ /**
+ * lock held
+ * @param ti
+ * @param bi
+ */
+ private void arrive_at_barrier_wait_or_release(MultiThreadInfo ti, BarrierInfo bi) {
+ if (TRACE) sb.append(" at Barrier ");
+ ti.currentBarrier = bi;
+ boolean isHiPri = ti.isHiPriRun; // thread.getPriority() == HIGH_PRIORITY;
+ if (isHiPri) {
+ if (TRACE) System.out.println(sb.append(", UNUSUAL hiPri arrived at barrier, ").append(ti.pipeline.isHiPriRun));
+ // debug
+ if (debug_dump1 > 0) {
+ debug_dump1 --;
+ new Throwable().printStackTrace(System.out);
+ }
+ return;
+ }
+
+// if (isEndState) {
+// return;
+// }
+
+ /***************************************************
+ * barrier wait, or release if cohorts all present
+ * also release if no waiting threads available to take the place (end state)
+ * also release if barrier is in open state
+ ***************************************************/
+ if (bi.released) {
+ if (ti.isPendingLowPriWait) {
+ ti.from_low_pri_pending_wait_to_hi_prty_run();
+ } else {
+ ti.from_low_prty_run_to_high_prty_run___rr();
+ }
+ return;
+ }
+
+ // barrier not yet released
+ int nbr_waiting_at_barrier = bi.barrier_wait_pool.size();
+
+ // hold if not yet accum enough, provided there's a compensating one to wake up
+ if (nbr_waiting_at_barrier < barrierNbrOfThreads - 1 && wait_pool.size() > 0) {
+ barrier_hold(bi, ti);
+ } else {
+ ti.from_low_prty_run_to_high_prty_run___rr();
+ barrier_wakeup(bi); // does pending-waits on up to baseNbr - 1 low-pri running threads.
+ }
+
+ }
+
+ /**
+ * Wake up a barrier
+ * releasing it,
+ * marking some number of low-pri running items to wait (will automatically signal pending hi-pri item)
+ * @param bi - the barrier
+ * @param adj - an adjustment to the number to be "waited"
+ */
+
+ private void barrier_wakeup(BarrierInfo bi) {
+
+ if ( ! bi.released) {
+ int barrier_size = bi.barrier_wait_pool.size();
+ bi.released = true;
+ if (TRACE) System.out.println(sb.append(" ***barrier_released(").append(barrier_size).append(")***"));
+
+ // the barrier might be 25, but the number to initially hold is limited by
+ // the parallelism (e.g. 9)
+ // a count of any high-pri that are still running (from a prior barrier release)
+ int nbr_to_initially_hold = Math.min(bi.barrier_wait_pool.size(), run_pool.size());
+ hold_n_low_pri(nbr_to_initially_hold); // to let all of the barrier ones run
+ } // else another thread arrived at the barrier before it was reset
+ // in which case just let thru, at high priority
+ // not added to any queue
+ else {
+ if (TRACE) System.out.println(sb.append(" UNUSUAL added thread to existing hi-pri released barrier"));
+ }
+ }
+
+ /**
+ * When launching n threads at a barrier, hold up-to-n low priority threads.
+ * may be less than n threads available to hold due to ending state
+ *
+ * called under lock
+ */
+ private void hold_n_low_pri(int nbr_to_hold) {
+ if (run_pool.isEmpty()) return;
+
+ if (TRACE) sb.append(", hold_low_pri: removing:");
+
+ for (int i = 0; i < nbr_to_hold; i++) {
+// from_low_prty_run_to_pending_waitf___rr(); //embeded directly here
+ MultiThreadInfo ti = run_pool.removeLast();
+ if (TRACE) {
+ sb.append("t#").append(ti.t_number).append(", ");
+ }
+ if (ti.isPendingLowPriWait) {
+ throw new RuntimeException("debug ERROR setting pending wait - found a pending wait in run_pool");
+ }
+ ti.setPendingLowPriWait(true); // true: set pending wait in front, eventually
+ }
+ if (TRACE) {
+ System.out.println(sb.append(" from run_pool"));
+ }
+ }
+
+ /**
+ * entered holding instance_lock
+ * @param bi
+ * @param ti
+ */
+ private void barrier_hold(BarrierInfo bi, MultiThreadInfo ti) {
+ if (ti.isPendingLowPriWait) { // different kind of wait (or run) than low-pri wait
+ ti.from_low_pri_pending_wait_to_barrier_wait___ba();
+ } else {
+ ti.from_low_prty_run_to_barrier_waitb___rr_ba();
+ }
+ }
+
+ /**
+ * wakeup a low pri or barrier wait
+ * @return true if a thread was signalled; false couldn't find one
+ */
+ private boolean wakeup_hi_or_low() {
+ // find another thread to wake up
+ for (BarrierInfo bi : barrier_starts.values()) {
+ if (bi.released && ! bi.barrier_wait_pool.isEmpty()) {
+ bi.from_barrier_wait_to_hi_prty_run___br();
+ return true;
+ }
+ }
+ if (! wait_pool.isEmpty()) {
+ return wakeUpLowPri_from_front();
+ }
+ return false;
+ }
+
+ /**
+ * wakes up from front of wait pool
+ * goes to end of run pool
+ * skips trying to wake up out-of-pipeline threads if possible (should never happen)
+ * @return true if found a thread to wake up, false if the wait thread pool is or becomes empty
+ */
+ private boolean wakeUpLowPri_from_front() {
+ // wake up one low-pri thread to compensate for stalling this one
+
+ while ( ! wait_pool.isEmpty()) {
+ MultiThreadInfo to_wakeup = wait_pool.removeFirst();
+ if ( ! to_wakeup.isWithinPipeline) {
+ throw new RuntimeException(sb.append(", error - wait pool has item not in pipeline").toString());
+ }
+
+ to_wakeup.from_low_prty_wait_to_low_prty_run___wr_ra();
+ return true;
+ }
+
+ if (TRACE) System.out.println(sb.append(" wakeupLowPri failed to find"));
+ return false;
+ }
+
+// /**
+// * wakeup low priority
+// * @param to_wakeup the thread to wakeup
+// * @param to_be_put_back a list of items in the low priority queue to be returned
+// */
+// private void wakeup_lo_pri(MultiThreadInfo to_wakeup) {
+// to_wakeup.wakeup(); // only does signalling
+// to_wakeup.from_low_prty_wait_to_low_prty_run___wr_ra();
+// }
+
+// //wait this thread if low-pri and have too many threads running
+// /**
+// * wait this thread
+// *
+// * Wakes up a compensating thread (except if initialWait)
+// * - a high priority thread if possible
+// * - a waiting low pri thread not outside pipeline, from the front of the q
+// * - a waiting low pri thread outside pipeline, from front of the q
+// *
+// * if can't find another thread to wakeup, skip waiting this thread
+// *
+// * Called for both isPendingWait, and explicitly
+// *
+// * @param ti - the thread to examine
+// * @return true if normal completion, false if endstate
+// */
+// private boolean wait_this_thread(MultiThreadInfo ti) {
+//
+// // find another thread to wake up
+// boolean wokeUp;
+// if (ti.initialWait) {
+// ti.initialWait = false; // reset the one time startup condition
+// wokeUp = true; // pretend to wakeup if initial wait, without waking anything up.
+//// addLast(wait_pool, ti);
+//// ti.isPendingLowPriWait = false;
+//// ti.isKeepWaiting = false;
+// } else {
+// wokeUp = wakeup_hi_or_low();
+// }
+// if (TRACE) show_thread_state();
+//
+// add_to_wait_q(ti); // outside of next if, because
+// // if ! wokeup, still will remove this from waitpool
+//
+// if (wokeUp) {
+// if (TRACE) System.out.println(sb.append(" starting to wait"));
+//
+// if ( ! isEndState) {
+// long startSleep = TRACE ? System.nanoTime() : 0;
+// ti.isKeepWaiting = true;
+// while (ti.isKeepWaiting) {
+// try {
+// ti.condition.await(); // need separate object per thread to wait on
+// } catch (InterruptedException e) {
+// if (TRACE) System.out.println("MultiThreadCoordination singleThread wait got interrupt");
+// }
+// }
+//
+// startTrace(ti, " after wakeup");
+// if (TRACE) System.out.println(sb.append(String.format(" in %,.4f ms", (System.nanoTime() - startSleep) / 1000000.0)));
+// return true;
+// } else {
+// if (TRACE) System.out.println(sb.append(" skip wait, end-state"));
+// return false; // no wait, was end state
+// }
+// } else {
+// if (TRACE) System.out.println(sb.append(" skip wait, no available corresponding thread to wakeup"));
+// return false;
+// }
+// }
+
+
+ /**
+ * Adds this thread to an appropriate wait q.
+ * This is done after any wakeup logic, to prevent that logic from waking up
+ * this very thread.
+ *
+ * There are 2 wait queues: 1) the barrier wait q, 2) the low-pri wait q
+ * The low-pri wait queue might add to the front or back.
+ *
+ * @param ti - the thread info being added to a wait queue
+ */
+// void add_to_wait_q(MultiThreadInfo ti) {
+// if (ti.isPendingBarrierWait) {
+// if (ti.isPendingLowPriWait) {
+// throw new RuntimeException(sb.append(" *** ERROR conflict in pending waits - both set").toString());
+// }
+// addLast(ti.currentBarrier.barrier_wait_pool, ti);
+// ti.isPendingBarrierWait = false;
+// } else {
+// add_to_wait_pool(ti);
+// }
+// }
+
+ /**
+ * Called in different thread when setting up worker threads
+ * @param t the worker thread
+ */
+ public void addThread(Thread t) {
+ int t_nbr = created_thread_number.getAndIncrement();
+ MultiThreadInfo ti = new MultiThreadInfo(t, t_nbr, this);
+ multi_thread_infos.add(ti);
+ if (USE_PRIORITY) t.setPriority(LOW_PRIORITY);
+ if (USE_PRIORITY && ASSERTS) {
+ if (t.getPriority() != LOW_PRIORITY) {
+ throw new RuntimeException(sb.append("failed to set low priority").toString());
+ }
+ }
+ thread_to_multiThreadCoordination.put(t, ti);
+
+ if (TRACE) {
+ String tn = Integer.toString(t_nbr);
+ thread_state_key.append(tn.charAt(tn.length() - 1));
+ thread_state.append(' ');
+ thread_state_in_out.append('O');
+ }
+
+ // done at start_of_pipeline in order to capture order of items
+// if (t_nbr >= baseNbrOfThreads) {
+// ti.setPendingLowPriWait(false); //false = queue at end (last); wait all threads except the first n
+// ti.initialWait = true;
+//// wait_pool.addLast(ti); // pending don't go into wait pool
+// if (TRACE)ti.set_thread_state('P');
+// } else {
+// run_pool.addLast(ti); // no shouldWait are in run_pool
+// ti.isLowPriRun = true;
+// if (TRACE) ti.set_thread_state('R');
+// }
+ if (TRACE) {
+ System.out.format("TrMTC setup added %d thread %s%n", t_nbr, t.getName());
+ }
+ }
+
+// /**
+// * called by primitive - process when process returns with hi pri thread (running after barrier
+// */
+//
+// void hi_pri_proc_end(MultiThreadInfo ti, String key) {
+// BarrierInfo bi = barrier_ends.get(key);
+// if (bi == null) return;
+//
+// acquireLock_startTrace(ti, " end_of_process_hi_pri");
+// try {
+// reset_to_low_priority_at_barrier_end(ti);
+// } catch (Throwable e) {
+// System.out.println(sb.append(" Throwable caught"));
+// e.printStackTrace();
+// throw e;
+// } finally {
+//
+// instance_lock.unlock();
+// }
+// }
+
+// /**
+// * reset to low priority, at end of hi priority section, or end of pipeline.
+// * if more hi-pri waiting to be released,
+// * - put this thread in low-pri wait
+// * - release the next hi-pri thread
+// *
+// * otherwise, put back in low-pri run pool
+// * @param ti -
+// * @returns true if put into run pool, false if put into wait pool
+// */
+// private void reset_to_low_priority_at_barrier_end(MultiThreadInfo ti) {
+// BarrierInfo bi = reset_to_low_priority(ti); // adds to run pool unless other hi-pri waiting
+// }
+
+ /**
+ * called when about to send work into the pipeline
+ * @param ti the thread info
+ */
+ private void start_of_pipeline(MultiThreadInfo ti) {
+
+ acquireLock_startTrace(ti, " start of pipeline");
+ try {
+ ti.isInProcess = true;
+ TimerTask tt = ti.newWorkTimer;
+ ti.newWorkTimer = null;
+ if (tt != null) {
+ tt.cancel();
+ }
+
+ nbr_in_process ++;
+ ti.isWithinPipeline = true;
+ if (TRACE) {
+ ti.seq = seq.incrementAndGet();
+ sb.append(" new seq#: ").append(ti.seq);
+ thread_state_in_out.setCharAt(ti.t_number, 'K');
+ }
+ if (ti.pipeline.child != null) {
+ // other thread finished, but never got a chance to clean up
+ ti.pipeline.child = null;
+ if (TRACE) System.out.println(sb.append("; cleanup child pipelines"));
+ }
+
+ if (TRACE) {
+ show_thread_state();
+ }
+
+ if (ti.initial) {
+ ti.initial = false;
+ int n = nbr_started_threads.incrementAndGet();
+ if (n <= baseNbrOfThreads) {
+ run_pool.addLast(ti);
+ ti.isLowPriRun = true;
+ if (TRACE) ti.set_thread_state('R');
+ } else {
+ ti.setPendingLowPriWait(false);
+ ti.initialWait = true;
+ if (TRACE)ti.set_thread_state('P');
+ ti.from_low_pri_pending_wait_to_wait___wa();
+ }
+ }
+
+ if (ASSERTS) {
+ if (ti.hasNoWork) throw new RuntimeException("not handled, should never happen");
+ if (USE_PRIORITY && ti.thread.getPriority() != LOW_PRIORITY) {
+ System.out.println("WARN: start of pipeline with thread NOT at LOW_PRIORITY");
+ }
+ }
+
+ } catch (Throwable e) {
+ System.out.println(sb.append(" Throwable caught"));
+ e.printStackTrace();
+ throw e;
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+
+ private void start_of_inner_pipeline(Pipeline pipeline) {
+ if (USE_PRIORITY) pipeline.ti.thread.setPriority(LOW_PRIORITY); // in case someone else set it up
+ pipeline.ti.setLowPriRun(); // just sets boolean states
+ if (TRACE) {
+ show_thread_state();
+ }
+ }
+ // called only at end of pipeline.
+ // rotate this thread to the end and suspend
+ /**
+ * At end of pipeline
+ *
+ * If hi-pri thread, reset to low-pri.
+ *
+ * Rotate this thread to the end of the wait q.
+ *
+ * wait this thread
+ * -- implies waking up another one
+ * - this -> wait(end)
+ * - detect end-state when the nbr_in_process drops below the baseNbrOfThreads.
+ *
+ * Edge cases:
+ * - this thread already marked pending wait:
+ * -- this -> wait(end)
+ *
+ * @param ti this thread, to be suspended
+ */
+ private void end_of_pipeline_rotate_thread(MultiThreadInfo ti) {
+ acquireLock_startTrace(ti, " end-of-pipeline");
+ try {
+
+ if ( ! ti.isInProcess) {
+ // is second spurious call, ignore
+ return;
+ }
+ ti.isInProcess = false;
+
+ boolean wasHighPri = ti.isHiPriRun; //ti.thread.getPriority() == HIGH_PRIORITY;
+
+ if (ASSERTS) {
+ if (wasHighPri && ! ti.pipeline.isHiPriRun) throw new RuntimeException(sb.append(", inconsistent1").toString());
+ if ( ! wasHighPri && ti.pipeline.isHiPriRun) throw new RuntimeException(sb.append(", inconsistent2").toString());
+ }
+
+ while(ti.pipeline.parent != null) ti.pipeline = ti.pipeline.parent;
+
+ if (ti.isPendingLowPriWait) {
+ // could happen if another thread releases a barrier, which puts this thread into pending wait
+ ti.from_low_pri_pending_wait_to_wait___wa();
+ }
+
+ if (wasHighPri) {
+ ti.reset_to_low_priority(); // maybe waits if other hi-pri
+ } else {
+ ti.from_low_prty_run_to_waitb___wa_rr();
+ }
+
+ if (TRACE) {
+ System.out.println(sb.append(" req new work"));
+ ti.set_thread_state('Q');
+ }
+ // when this wakes up, will go to request a new item.
+ // time out this event
+ ti.hasNoWork = false;
+
+ if (TRACE) thread_state_in_out.setCharAt(ti.t_number, 'O');
+ ti.isWithinPipeline = false;
+
+ nbr_in_process --;
+// maybeSetEndState();
+
+ ti.newWorkTimer = createTimeOutNewWork(ti);
+ no_more_work_timer.schedule(ti.newWorkTimer, 5000L); // 5 seconds
+
+ } catch (Throwable e) {
+ System.out.println(sb.append(" Throwable caught"));
+ e.printStackTrace();
+ throw e;
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+
+ private void startTrace(MultiThreadInfo ti, String ... sa) {
+ if (TRACE) {
+ sb.setLength(0);
+ sb.append(TRACE_ID.getAndIncrement()).append(" TrMTC t#").append(ti.t_number);
+ time_seq(sb, ti);
+ for (String s : sa) sb.append(s);
+ }
+ }
+
+ private void show_thread_state() {
+ if (TRACE) {
+ int running_high = 0;
+ int running_low = 0;
+ int barrier_wait = 0;
+ int waiting_low = 0;
+ int pending_low_wait = 0;
+ int req = 0;
+ int empty = 0;
+ int init = 0;
+// int in_pipeline = 0;
+// int out_of_pipeline = 0;
+
+ for (int i = 0; i < thread_state.length(); i++) {
+ char c = thread_state.charAt(i);
+ switch (c) {
+ case 'R': running_low++; break;
+ case 'W': waiting_low++; break;
+ case 'P': pending_low_wait++; break;
+ case 'B': barrier_wait++; break;
+ case 'H': running_high++; break;
+ case 'Q': req++; break;
+ case 'E': empty++; break;
+ case 'U': init++; break;
+ case 'X': break;
+ default: throw new RuntimeException("never happen " + i);
+ }
+ }
+
+// for (int i = 0; i < thread_state_in_out.length(); i++) {
+// char c = thread_state_in_out.charAt(i);
+// switch (c) {
+// case 'K': in_pipeline++; break;
+// case 'O': out_of_pipeline++; break;
+// default: throw new RuntimeException("never happen " + i);
+// }
+// }
+
+ int[] depths = new int[multi_thread_infos.size()];
+ int maxDepth = 0;
+ int j = 0;
+ for (MultiThreadInfo ti : multi_thread_infos) {
+ int d = 0;
+ Pipeline p = ti.pipeline;
+ while (p.parent != null) {
+ d++;
+ p = p.parent;
+ }
+ depths[j++] = d;
+ maxDepth = Math.max(maxDepth, d);
+ }
+
+ System.out.println("TS: " + exp2(thread_state_key));
+
+ System.out.format("TS: %s --H: %d, R: %d-- ==B: %d, W: %d, P: %d== Q: %d, E: %d, U: %d%n",
+ exp2(thread_state),
+ running_high, running_low,
+ barrier_wait, waiting_low, pending_low_wait,
+ req, empty, init);
+
+ // temporarily turn off
+// if (maxDepth > 0) {
+// char[] ca = new char[depths.length];
+// for (int i = 0; i < depths.length; i++) {
+// String ss = Integer.toString(depths[i]);
+// ca[i] = ss.charAt(ss.length() - 1);
+// if (ca[i] == '0') ca[i] = ' ';
+// }
+// System.out.format("TS: %s%n", exp2(new StringBuilder(new String(ca))));
+// }
+//
+// System.out.format("TS: %s in: %d, out: %d%n",
+// exp2(thread_state_in_out),
+// in_pipeline, out_of_pipeline);
+ }
+ }
+
+ private String exp2(StringBuilder s) {
+ StringBuilder sb = new StringBuilder(s.length() * 2);
+ for (int i = 0; i < s.length(); i++) {
+ sb.append(s.charAt(i));
+ if (i % 2 == 1) sb.append(' ');
+ }
+ return sb.toString();
+ }
+
+ static private StringBuilder time_seq(StringBuilder sb, MultiThreadInfo ti) {
+ sb.append(String.format(" %1$tH:%1$tM:%1$tS.%1$tL", new Date()))
+ .append(" s#").append(ti.seq);
+ return sb;
+ }
+
+ private void acquireLock_startTrace(MultiThreadInfo ti, String ... s) {
+ instance_lock.lock(); // needed to test ti.shouldWait
+ startTrace(ti, s);
+ }
+
+ private TimerTask createTimeOutNewWork(final MultiThreadInfo ti) {
+ return new TimerTask() {
+ @Override
+ public void run() {
+ acquireLock_startTrace(ti, " time out - no more work");
+ try {
+ ti.from_low_prty_run_end___rr();
+ } catch (Throwable e) {
+ if (TRACE) System.out.println(sb.append(" Throwable caught, not being rethrown"));
+ e.printStackTrace();
+ // not rethrown, because it causes the timer thread to fail
+ } finally {
+ instance_lock.unlock();
+ }
+ }
+ };
+ }
+
+// private void set_low_pri_run(MultiThreadInfo ti, boolean isFirst) {
+// ti.setLowPriRun();
+// if (isFirst) {
+// ti.addFirst(run_pool);
+// } else {
+// ti.addLast(run_pool);
+// }
+// }
+
+ private String annot_short_name(String id) {
+ String[] a = id.split("/");
+ StringBuilder sb = new StringBuilder(id.length());
+ for (String s : a) {
+ if (s.length() > 0) {
+ sb.append(shorten(s, 16)).append('/');
+ }
+ }
+ sb.setLength(sb.length() - 1);
+ return sb.toString();
+ }
+
+ private String shorten(String s, int len) {
+ if (s.length() <= len + 1) return s;
+
+ int l = len >> 1;
+ return s.substring(0, l) + "." + s.substring(s.length() - l);
+ }
+
+ /** find and wakeup one barrier hold */
+ void wakeup_1_at_barrier() {
+ for (BarrierInfo bi : barrier_starts.values()) {
+ if (bi.barrier_wait_pool.size() == 0) continue;
+ if (TRACE) System.out.println(sb.append(", releasing from barrier at end"));
+ bi.from_barrier_wait_to_hi_prty_run___br();
+ return;
+ }
+ if (TRACE) System.out.println(sb.append(", all barriers empty, reducing active threads by 1"));
+ }
+
+// private void maybeSetEndState() {
+// /**************************************************
+// * ASSUME: if nbr_in_process falls below base,
+// * then at "end state"
+// * ASSUME: on startup, the nbr_in_process rises above
+// * base before first pipeline finishes
+// * (otherwise, may have to put in some kind of startup mode)
+// **************************************************/
+// if (nbr_in_process < baseNbrOfThreads) {
+// if (TRACE) System.out.println("****************SETTING END STATE***************");
+// isEndState = true; // maybe replace with other
+//// wakeUp(); // wake up one thread, it's guaranteed to arrive here
+//// wakeUp(); // and one extra
+// }
+//
+// }
+}
Modified: uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/PrimitiveAnalysisEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/PrimitiveAnalysisEngine_impl.java?rev=1843956&r1=1843955&r2=1843956&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/PrimitiveAnalysisEngine_impl.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/analysis_engine/impl/PrimitiveAnalysisEngine_impl.java Mon Oct 15 22:02:39 2018
@@ -35,6 +35,7 @@ import org.apache.uima.analysis_engine.A
import org.apache.uima.analysis_engine.CasIterator;
import org.apache.uima.analysis_engine.ResultNotSupportedException;
import org.apache.uima.analysis_engine.ResultSpecification;
+import org.apache.uima.analysis_engine.impl.MultiThreadCoordination.MultiThreadInfo;
import org.apache.uima.analysis_engine.impl.compatibility.AnalysisComponentAdapterFactory;
import org.apache.uima.cas.AbstractCas;
import org.apache.uima.cas.CAS;
@@ -60,11 +61,12 @@ import org.apache.uima.util.Logger;
*
*/
public class PrimitiveAnalysisEngine_impl extends AnalysisEngineImplBase implements AnalysisEngine {
+
+ private static final boolean IS_MULTI_THREAD_COORD = false; // for experimenting
+
/**
* UIMA-5043 Set & restore the UimaContextHolder around calls to user code so it can be used to access the External Settings
*/
-
-
private static final Class<PrimitiveAnalysisEngine_impl> CLASS_NAME = PrimitiveAnalysisEngine_impl.class;
/**
@@ -304,6 +306,23 @@ public class PrimitiveAnalysisEngine_imp
* @see AnalysisEngine#processAndOutputNewCASes(CAS)
*/
public CasIterator processAndOutputNewCASes(CAS aCAS) throws AnalysisEngineProcessException {
+ if (IS_MULTI_THREAD_COORD) {
+ String key = ((UimaContextAdmin)getUimaContext()).getQualifiedContextName();
+ MultiThreadInfo mti = null;
+ try {
+ mti = MultiThreadCoordination.at_call_primitive(key, ((CASImpl)aCAS).getCasId(), ((CASImpl)aCAS).getCasResets());
+ return innerCall(aCAS);
+ } finally {
+ if (mti != null) {
+ mti.mtc.at_call_primitive_exit(mti, key);
+ }
+ }
+ } else {
+ return innerCall(aCAS);
+ }
+ }
+
+ private CasIterator innerCall(CAS aCAS) throws AnalysisEngineProcessException {
enterProcess();
try {
// make initial call to the AnalysisComponent
@@ -385,12 +404,12 @@ public class PrimitiveAnalysisEngine_imp
mAnalysisComponent.setResultSpecification(analysisComponentResultSpec);
mResultSpecChanged = false;
}
-
// insure view is passed to switch / restore class loader https://issues.apache.org/jira/browse/UIMA-2211
((CASImpl)view).switchClassLoaderLockCasCL(this.getResourceManager().getExtensionClassLoader());
// call the process method
mAnalysisComponent.process(casToPass);
+
getMBean().incrementCASesProcessed();
//note we do not clear the CAS's currentComponentInfo at this time
Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/AnalysisEngine_implTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/AnalysisEngine_implTest.java?rev=1843956&r1=1843955&r2=1843956&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/AnalysisEngine_implTest.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/AnalysisEngine_implTest.java Mon Oct 15 22:02:39 2018
@@ -1579,7 +1579,7 @@ public class AnalysisEngine_implTest ext
// Write out descriptor
File cloneFile = new File(inFile.getParentFile(), "CopyOfAggregateWithManyDelegates.xml");
BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(cloneFile));
- XMLSerializer xmlSerializer = new XMLSerializer(true);
+ XMLSerializer xmlSerializer = new XMLSerializer(false);
xmlSerializer.setOutputStream(os);
// set the amount to a value which will show up if used
// indent should not be used because we're using a parser mode which preserves
Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/PearAnalysisEngineWrapperTest.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/PearAnalysisEngineWrapperTest.java?rev=1843956&r1=1843955&r2=1843956&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/PearAnalysisEngineWrapperTest.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/analysis_engine/impl/PearAnalysisEngineWrapperTest.java Mon Oct 15 22:02:39 2018
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.uima.analysis_engine.impl;
import java.io.File;