You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:10 UTC
[64/98] [abbrv] incubator-apex-malhar git commit: - MLHR-1876
#resolve Cleanly terminated window bounded service thread and removed
unnecessary interrupt of main operator thread. - Fixed emitting schemas on
output port in a different thread. - Closed con
- MLHR-1876 #resolve Cleanly terminated window bounded service thread and removed unnecessary interrupt of main operator thread.
- Fixed emitting schemas on output port in a different thread.
- Closed connections in WebSocketInput Operator on teardown
- Removed obsolete unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/79e7eadf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/79e7eadf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/79e7eadf
Branch: refs/heads/master
Commit: 79e7eadf27879e4866ac399e31a41eb1a669eb0c
Parents: 0bb9599
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Oct 21 21:33:53 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Oct 22 15:19:32 2015 -0700
----------------------------------------------------------------------
.../lib/appdata/query/WindowBoundedService.java | 41 ++++++++++++++++----
.../snapshot/AbstractAppDataSnapshotServer.java | 21 +++++++---
.../lib/io/WebSocketInputOperator.java | 9 +++++
.../appdata/query/WindowBoundedServiceTest.java | 27 -------------
4 files changed, 59 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
index 6d229fd..4f653a3 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -22,7 +22,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
-
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
@@ -59,6 +59,7 @@ public class WindowBoundedService implements Component<OperatorContext>
protected transient ExecutorService executorThread;
private final transient Semaphore mutex = new Semaphore(0);
+ private volatile boolean terminated = false;
public WindowBoundedService(Runnable runnable)
{
@@ -78,8 +79,8 @@ public class WindowBoundedService implements Component<OperatorContext>
@Override
public void setup(OperatorContext context)
{
- executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread"));
- executorThread.submit(new AsynchExecutorThread(Thread.currentThread()));
+ executorThread = Executors.newSingleThreadExecutor(new NameableThreadFactory("Query Executor Thread"));
+ executorThread.submit(new AsynchExecutorThread());
}
public void beginWindow(long windowId)
@@ -99,17 +100,31 @@ public class WindowBoundedService implements Component<OperatorContext>
@Override
public void teardown()
{
- executorThread.shutdownNow();
+ LOG.info("Shutting down");
+ terminated = true;
+ mutex.release();
+
+ executorThread.shutdown();
+
+ try {
+ executorThread.awaitTermination(10000L + executeIntervalMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ //Do nothing
+ }
}
public class AsynchExecutorThread implements Callable<Void>
{
- private final Thread mainThread;
private long lastExecuteTime = 0;
+ public AsynchExecutorThread()
+ {
+ }
+
+ @Deprecated
public AsynchExecutorThread(Thread mainThread)
{
- this.mainThread = mainThread;
+ //Do nothing
}
@Override
@@ -121,7 +136,6 @@ public class WindowBoundedService implements Component<OperatorContext>
} catch (Exception e) {
LOG.error("Exception thrown while processing:", e);
mutex.release();
- mainThread.interrupt();
}
return null;
@@ -133,12 +147,25 @@ public class WindowBoundedService implements Component<OperatorContext>
while (true) {
long currentTime = System.currentTimeMillis();
long diff = currentTime - lastExecuteTime;
+
if (diff > executeIntervalMillis) {
lastExecuteTime = currentTime;
mutex.acquireUninterruptibly();
+
+ if (terminated) {
+ LOG.info("Terminated");
+ return;
+ }
+
runnable.run();
mutex.release();
} else {
+
+ if (terminated) {
+ LOG.info("Terminated");
+ return;
+ }
+
Thread.sleep(executeIntervalMillis - diff);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index e10bf9e..a309746 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -21,6 +21,7 @@ package com.datatorrent.lib.appdata.snapshot;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.validation.constraints.NotNull;
@@ -92,9 +93,10 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
*/
private List<GPOMutable> currentData = Lists.newArrayList();
private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
+ private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
@AppData.ResultPort
- public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
@AppData.QueryPort
@InputPortFieldAnnotation(optional=true)
@@ -118,9 +120,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
if (schemaResult != null) {
- String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
- LOG.debug("emitting {}", schemaResultJSON);
- queryResult.emit(schemaResultJSON);
+ LOG.debug("queueing {}", schemaResult);
+ schemaQueue.add(schemaResult);
}
} else if (query instanceof DataQuerySnapshot) {
queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
@@ -208,7 +209,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
}
{
- Result result = null;
+ Result result;
while((result = queryProcessor.process()) != null) {
String resultJSON = resultSerializerFactory.serialize(result);
@@ -217,6 +218,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
}
}
+ {
+ SchemaResult schemaResult;
+
+ while ((schemaResult = schemaQueue.poll()) != null) {
+ String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
+ LOG.debug("emitting {}", schemaResultJSON);
+ queryResult.emit(schemaResultJSON);
+ }
+ }
+
queryProcessor.endWindow();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index a51dc6f..3d7bc7a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -137,6 +137,15 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
catch (Exception ex) {
LOG.error("Error joining monitor", ex);
}
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ if (client != null) {
+ client.close();
+ }
+
super.teardown();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
index 9da82e9..3fb3780 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
@@ -64,33 +64,6 @@ public class WindowBoundedServiceTest
Assert.assertTrue(counterRunnable.getCounter() > 0);
}
- @Test
- public void exceptionTest() throws Exception
- {
- WindowBoundedService wbs = new WindowBoundedService(1,
- new ExceptionRunnable());
-
- wbs.setup(null);
- wbs.beginWindow(0);
-
- boolean caughtException = false;
-
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- caughtException = true;
- }
-
- try {
- wbs.endWindow();
- } catch(Exception e) {
- caughtException = true;
- }
-
- wbs.teardown();
- Assert.assertEquals(true, caughtException);
- }
-
public static class CounterRunnable implements Runnable
{
private int counter = 0;