You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:10 UTC

[64/98] [abbrv] incubator-apex-malhar git commit: - MLHR-1876 #resolve Cleanly terminated window bounded service thread and removed unnecessary interrupt of main operator thread. - Fixed emitting schemas on output port in a different thread. - Closed con

- MLHR-1876 #resolve Cleanly terminated window bounded service thread and removed unnecessary interrupt of main operator thread.
- Fixed emitting schemas on output port in a different thread.
- Closed connections in WebSocketInput Operator on teardown
- Removed obsolete unit test


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/79e7eadf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/79e7eadf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/79e7eadf

Branch: refs/heads/master
Commit: 79e7eadf27879e4866ac399e31a41eb1a669eb0c
Parents: 0bb9599
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Oct 21 21:33:53 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Oct 22 15:19:32 2015 -0700

----------------------------------------------------------------------
 .../lib/appdata/query/WindowBoundedService.java | 41 ++++++++++++++++----
 .../snapshot/AbstractAppDataSnapshotServer.java | 21 +++++++---
 .../lib/io/WebSocketInputOperator.java          |  9 +++++
 .../appdata/query/WindowBoundedServiceTest.java | 27 -------------
 4 files changed, 59 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
index 6d229fd..4f653a3 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -22,7 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
-
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 
@@ -59,6 +59,7 @@ public class WindowBoundedService implements Component<OperatorContext>
   protected transient ExecutorService executorThread;
 
   private final transient Semaphore mutex = new Semaphore(0);
+  private volatile boolean terminated = false;
 
   public WindowBoundedService(Runnable runnable)
   {
@@ -78,8 +79,8 @@ public class WindowBoundedService implements Component<OperatorContext>
   @Override
   public void setup(OperatorContext context)
   {
-    executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread"));
-    executorThread.submit(new AsynchExecutorThread(Thread.currentThread()));
+    executorThread = Executors.newSingleThreadExecutor(new NameableThreadFactory("Query Executor Thread"));
+    executorThread.submit(new AsynchExecutorThread());
   }
 
   public void beginWindow(long windowId)
@@ -99,17 +100,31 @@ public class WindowBoundedService implements Component<OperatorContext>
   @Override
   public void teardown()
   {
-    executorThread.shutdownNow();
+    LOG.info("Shutting down");
+    terminated = true;
+    mutex.release();
+
+    executorThread.shutdown();
+    
+    try {
+      executorThread.awaitTermination(10000L + executeIntervalMillis, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ex) {
+      //Do nothing
+    }
   }
 
   public class AsynchExecutorThread implements Callable<Void>
   {
-    private final Thread mainThread;
     private long lastExecuteTime = 0;
 
+    public AsynchExecutorThread()
+    {
+    }
+
+    @Deprecated
     public AsynchExecutorThread(Thread mainThread)
     {
-      this.mainThread = mainThread;
+      //Do nothing
     }
 
     @Override
@@ -121,7 +136,6 @@ public class WindowBoundedService implements Component<OperatorContext>
       } catch (Exception e) {
         LOG.error("Exception thrown while processing:", e);
         mutex.release();
-        mainThread.interrupt();
       }
 
       return null;
@@ -133,12 +147,25 @@ public class WindowBoundedService implements Component<OperatorContext>
       while (true) {
         long currentTime = System.currentTimeMillis();
         long diff = currentTime - lastExecuteTime;
+
         if (diff > executeIntervalMillis) {
           lastExecuteTime = currentTime;
           mutex.acquireUninterruptibly();
+
+          if (terminated) {
+            LOG.info("Terminated");
+            return;
+          }
+
           runnable.run();
           mutex.release();
         } else {
+
+          if (terminated) {
+            LOG.info("Terminated");
+            return;
+          }
+
           Thread.sleep(executeIntervalMillis - diff);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index e10bf9e..a309746 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -21,6 +21,7 @@ package com.datatorrent.lib.appdata.snapshot;
 import java.io.IOException;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.validation.constraints.NotNull;
 
@@ -92,9 +93,10 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    */
   private List<GPOMutable> currentData = Lists.newArrayList();
   private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
+  private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>();
 
   @AppData.ResultPort
-  public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>();
+  public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>();
 
   @AppData.QueryPort
   @InputPortFieldAnnotation(optional=true)
@@ -118,9 +120,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
         SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
 
         if (schemaResult != null) {
-          String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
-          LOG.debug("emitting {}", schemaResultJSON);
-          queryResult.emit(schemaResultJSON);
+          LOG.debug("queueing {}", schemaResult);
+          schemaQueue.add(schemaResult);
         }
       } else if (query instanceof DataQuerySnapshot) {
         queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
@@ -208,7 +209,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     }
 
     {
-      Result result = null;
+      Result result;
 
       while((result = queryProcessor.process()) != null) {
         String resultJSON = resultSerializerFactory.serialize(result);
@@ -217,6 +218,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
       }
     }
 
+    {
+      SchemaResult schemaResult;
+
+      while ((schemaResult = schemaQueue.poll()) != null) {
+        String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
+        LOG.debug("emitting {}", schemaResultJSON);
+        queryResult.emit(schemaResultJSON);
+      }
+    }
+
     queryProcessor.endWindow();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index a51dc6f..3d7bc7a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -137,6 +137,15 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
     catch (Exception ex) {
       LOG.error("Error joining monitor", ex);
     }
+
+    if (connection != null) {
+      connection.close();
+    }
+
+    if (client != null) {
+      client.close();
+    }
+
     super.teardown();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
index 9da82e9..3fb3780 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
@@ -64,33 +64,6 @@ public class WindowBoundedServiceTest
     Assert.assertTrue(counterRunnable.getCounter() > 0);
   }
 
-  @Test
-  public void exceptionTest() throws Exception
-  {
-    WindowBoundedService wbs = new WindowBoundedService(1,
-                                                        new ExceptionRunnable());
-
-    wbs.setup(null);
-    wbs.beginWindow(0);
-
-    boolean caughtException = false;
-
-    try {
-      Thread.sleep(500);
-    } catch (InterruptedException e) {
-      caughtException = true;
-    }
-
-    try {
-      wbs.endWindow();
-    } catch(Exception e) {
-      caughtException = true;
-    }
-
-    wbs.teardown();
-    Assert.assertEquals(true, caughtException);
-  }
-
   public static class CounterRunnable implements Runnable
   {
     private int counter = 0;