You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/10/21 22:02:19 UTC

apex-core git commit: APEXCORE-510 enforce emit() on the operator thread

Repository: apex-core
Updated Branches:
  refs/heads/master eaf041931 -> ce74fe78e


APEXCORE-510 enforce emit() on the operator thread


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ce74fe78
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ce74fe78
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ce74fe78

Branch: refs/heads/master
Commit: ce74fe78eb0dae5250bccbf1bb4ada755c3c2ebe
Parents: eaf0419
Author: Sanjay Pujare <sa...@datatorrent.com>
Authored: Thu Sep 29 05:53:06 2016 +0530
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Oct 21 14:01:51 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/api/DefaultOutputPort.java  |  17 +++
 .../datatorrent/api/DefaultOutputPortTest.java  | 136 +++++++++++++++++++
 2 files changed, 153 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
index 078a372..71be22c 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
@@ -18,6 +18,9 @@
  */
 package com.datatorrent.api;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.Operator.Unifier;
 
@@ -31,7 +34,11 @@ import com.datatorrent.api.Operator.Unifier;
  */
 public class DefaultOutputPort<T> implements Operator.OutputPort<T>
 {
+  public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable";
+  private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class);
+
   private transient Sink<Object> sink;
+  private transient Thread operatorThread;
 
   /**
    * <p>Constructor for DefaultOutputPort.</p>
@@ -48,6 +55,12 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
    */
   public void emit(T tuple)
   {
+    // operatorThread could be null if setup() never got called.
+    if (operatorThread != null && Thread.currentThread() != operatorThread) {
+      // only under certain modes: enforce this
+      throw new IllegalStateException("Current thread " + Thread.currentThread().getName() +
+          " is different from the operator thread " + operatorThread.getName());
+    }
     sink.put(tuple);
   }
 
@@ -88,6 +101,10 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
   @Override
   public void setup(PortContext context)
   {
+    if (Boolean.getBoolean(THREAD_AFFINITY_DISABLE_CHECK) == false) {
+      operatorThread = Thread.currentThread();
+      logger.debug("Enforcing emit on {}", operatorThread.getName());
+    }
   }
 
   /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
----------------------------------------------------------------------
diff --git a/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
new file mode 100644
index 0000000..43d04e5
--- /dev/null
+++ b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.api;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultOutputPortTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPortTest.class);
+
+  private DefaultOutputPort<Object> port;
+  private Sink<Object> sink;
+
+  @Before
+  public void setupTest()
+  {
+    port = new DefaultOutputPort<>();
+    sink = new Sink<Object>()
+    {
+      private volatile int count = 0;
+
+      @Override
+      public void put(Object tuple)
+      {
+        count++;
+      }
+
+      @Override
+      public int getCount(boolean reset)
+      {
+        return count;
+      }
+    };
+    port.setSink(sink);
+  }
+
+  /*
+   * Same thread for setup() and emit()
+   */
+  @Test
+  public void testSameThreadForSetupAndEmit()
+  {
+    port.setup(null);
+    port.emit(null);
+    Assert.assertEquals(1, sink.getCount(false));
+    // if it comes here it passes
+  }
+
+  /*
+   * setup() not called : null thread object should not cause exception
+   */
+  @Test
+  public void testSetupNotCalledAndEmit()
+  {
+    port.emit(null);
+    Assert.assertEquals(1, sink.getCount(false));
+    // if it comes here it passes
+  }
+
+  volatile boolean pass = false;
+
+  /*
+   * Different thread for setup() and emit()
+   */
+  @Test
+  public void testDifferentThreadForSetupAndEmit() throws InterruptedException
+  {
+    System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK);  // do not suppress the check
+    pass = false;
+    port.setup(null);
+    Thread thread = new Thread("test-thread-xyz")
+    {
+      @Override
+      public void run()
+      {
+        try {
+          port.emit(null);
+        } catch (IllegalStateException ise) {
+          pass = ise.getMessage().startsWith("Current thread test-thread-xyz is different from the operator thread ");
+        }
+      }
+    };
+    thread.start();
+    thread.join();
+    Assert.assertTrue("same thread check didn't take place!", pass);
+    Assert.assertEquals(0, sink.getCount(false)); // no put() on sink
+  }
+
+  /*
+   * Different thread for setup() and emit() but suppress check property set
+   */
+  @Test
+  public void testDifferentThreadForSetupAndEmit_CheckSuppressed() throws InterruptedException
+  {
+    System.setProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK, "true");  // suppress the check
+    port.setup(null);
+    pass = true;
+    Thread thread = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          port.emit(null);
+        } catch (IllegalStateException ise) {
+          pass = false;
+        }
+      }
+    };
+    thread.start();
+    thread.join();
+    Assert.assertEquals("same thread check was not suppressed!", 1, sink.getCount(false));
+    Assert.assertTrue("Exception was thrown!", pass);
+    System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK);
+  }
+}