You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/10/21 22:02:19 UTC
apex-core git commit: APEXCORE-510 enforce emit() on the operator
thread
Repository: apex-core
Updated Branches:
refs/heads/master eaf041931 -> ce74fe78e
APEXCORE-510 enforce emit() on the operator thread
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ce74fe78
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ce74fe78
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ce74fe78
Branch: refs/heads/master
Commit: ce74fe78eb0dae5250bccbf1bb4ada755c3c2ebe
Parents: eaf0419
Author: Sanjay Pujare <sa...@datatorrent.com>
Authored: Thu Sep 29 05:53:06 2016 +0530
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Oct 21 14:01:51 2016 -0700
----------------------------------------------------------------------
.../com/datatorrent/api/DefaultOutputPort.java | 17 +++
.../datatorrent/api/DefaultOutputPortTest.java | 136 +++++++++++++++++++
2 files changed, 153 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
index 078a372..71be22c 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
@@ -18,6 +18,9 @@
*/
package com.datatorrent.api;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.Operator.Unifier;
@@ -31,7 +34,11 @@ import com.datatorrent.api.Operator.Unifier;
*/
public class DefaultOutputPort<T> implements Operator.OutputPort<T>
{
+ public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable";
+ private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class);
+
private transient Sink<Object> sink;
+ private transient Thread operatorThread;
/**
* <p>Constructor for DefaultOutputPort.</p>
@@ -48,6 +55,12 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
*/
public void emit(T tuple)
{
+ // operatorThread could be null if setup() never got called.
+ if (operatorThread != null && Thread.currentThread() != operatorThread) {
+ // only under certain modes: enforce this
+ throw new IllegalStateException("Current thread " + Thread.currentThread().getName() +
+ " is different from the operator thread " + operatorThread.getName());
+ }
sink.put(tuple);
}
@@ -88,6 +101,10 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
@Override
public void setup(PortContext context)
{
+ if (Boolean.getBoolean(THREAD_AFFINITY_DISABLE_CHECK) == false) {
+ operatorThread = Thread.currentThread();
+ logger.debug("Enforcing emit on {}", operatorThread.getName());
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/apex-core/blob/ce74fe78/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
----------------------------------------------------------------------
diff --git a/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
new file mode 100644
index 0000000..43d04e5
--- /dev/null
+++ b/api/src/test/java/com/datatorrent/api/DefaultOutputPortTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.api;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultOutputPortTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPortTest.class);
+
+ private DefaultOutputPort<Object> port;
+ private Sink<Object> sink;
+
+ @Before
+ public void setupTest()
+ {
+ port = new DefaultOutputPort<>();
+ sink = new Sink<Object>()
+ {
+ private volatile int count = 0;
+
+ @Override
+ public void put(Object tuple)
+ {
+ count++;
+ }
+
+ @Override
+ public int getCount(boolean reset)
+ {
+ return count;
+ }
+ };
+ port.setSink(sink);
+ }
+
+ /*
+ * Same thread for setup() and emit()
+ */
+ @Test
+ public void testSameThreadForSetupAndEmit()
+ {
+ port.setup(null);
+ port.emit(null);
+ Assert.assertEquals(1, sink.getCount(false));
+ // if it comes here it passes
+ }
+
+ /*
+ * setup() not called : null thread object should not cause exception
+ */
+ @Test
+ public void testSetupNotCalledAndEmit()
+ {
+ port.emit(null);
+ Assert.assertEquals(1, sink.getCount(false));
+ // if it comes here it passes
+ }
+
+ volatile boolean pass = false;
+
+ /*
+ * Different thread for setup() and emit()
+ */
+ @Test
+ public void testDifferentThreadForSetupAndEmit() throws InterruptedException
+ {
+ System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK); // do not suppress the check
+ pass = false;
+ port.setup(null);
+ Thread thread = new Thread("test-thread-xyz")
+ {
+ @Override
+ public void run()
+ {
+ try {
+ port.emit(null);
+ } catch (IllegalStateException ise) {
+ pass = ise.getMessage().startsWith("Current thread test-thread-xyz is different from the operator thread ");
+ }
+ }
+ };
+ thread.start();
+ thread.join();
+ Assert.assertTrue("same thread check didn't take place!", pass);
+ Assert.assertEquals(0, sink.getCount(false)); // no put() on sink
+ }
+
+ /*
+ * Different thread for setup() and emit() but suppress check property set
+ */
+ @Test
+ public void testDifferentThreadForSetupAndEmit_CheckSuppressed() throws InterruptedException
+ {
+ System.setProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK, "true"); // suppress the check
+ port.setup(null);
+ pass = true;
+ Thread thread = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ port.emit(null);
+ } catch (IllegalStateException ise) {
+ pass = false;
+ }
+ }
+ };
+ thread.start();
+ thread.join();
+ Assert.assertEquals("same thread check was not suppressed!", 1, sink.getCount(false));
+ Assert.assertTrue("Exception was thrown!", pass);
+ System.clearProperty(DefaultOutputPort.THREAD_AFFINITY_DISABLE_CHECK);
+ }
+}