You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2018/03/26 13:49:09 UTC

[apex-malhar] branch master updated: APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex operator

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 55a6abb  APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex operator
55a6abb is described below

commit 55a6abb64160e6d0228c26d347ff6c83b5bfd3ad
Author: Ananth Gundabattula <an...@apache.org>
AuthorDate: Wed Mar 7 17:00:09 2018 +1100

    APEXMALHAR-2260.PythonExecutionOperator Implement python code as an Apex operator
---
 pom.xml                                            |   1 +
 python/pom.xml                                     |  94 ++++
 .../apex/malhar/python/base/ApexPythonEngine.java  | 185 ++++++
 .../base/ApexPythonInterpreterException.java       |  35 ++
 .../python/base/BasePythonExecutionOperator.java   | 601 ++++++++++++++++++++
 .../python/base/PythonInterpreterConfig.java       |  37 ++
 .../malhar/python/base/WorkerExecutionMode.java    |  43 ++
 .../malhar/python/base/jep/InterpreterThread.java  | 626 +++++++++++++++++++++
 .../malhar/python/base/jep/InterpreterWrapper.java | 350 ++++++++++++
 .../malhar/python/base/jep/JepPythonEngine.java    | 621 ++++++++++++++++++++
 .../apex/malhar/python/base/jep/SpinPolicy.java    |  25 +
 .../apex/malhar/python/base/jep/package-info.java  |  22 +
 .../apex/malhar/python/base/package-info.java      |  22 +
 .../AbstractPythonExecutionPartitioner.java        |  75 +++
 .../PythonExecutionPartitionerType.java            |  24 +
 .../ThreadStarvationBasedPartitioner.java          |  92 +++
 .../python/base/partitioner/package-info.java      |  22 +
 .../requestresponse/EvalCommandRequestPayload.java |  72 +++
 .../GenericCommandsRequestPayload.java             |  36 ++
 .../requestresponse/MethodCallRequestPayload.java  |  48 ++
 .../base/requestresponse/PythonCommandType.java    |  27 +
 .../requestresponse/PythonInterpreterRequest.java  | 130 +++++
 .../requestresponse/PythonInterpreterResponse.java |  61 ++
 .../requestresponse/PythonRequestResponse.java     |  96 ++++
 .../ScriptExecutionRequestPayload.java             |  34 ++
 .../python/base/requestresponse/package-info.java  |  22 +
 .../python/base/util/NDArrayKryoSerializer.java    | 170 ++++++
 .../malhar/python/base/util/NDimensionalArray.java | 102 ++++
 .../base/util/PythonRequestResponseUtil.java       | 133 +++++
 .../malhar/python/PythonExecutorApplication.java   |  60 ++
 .../python/PythonExecutorApplicationTest.java      |  96 ++++
 .../malhar/python/PythonPayloadPOJOGenerator.java  | 169 ++++++
 .../apex/malhar/python/PythonProcessingPojo.java   |  75 +++
 .../apex/malhar/python/SimplePythonOpOperator.java | 101 ++++
 .../apex/malhar/python/base/jep/BaseJEPTest.java   | 211 +++++++
 .../python/base/jep/InterpreterThreadTest.java     | 176 ++++++
 .../python/base/jep/InterpreterWrapperTest.java    |  97 ++++
 .../python/base/jep/JepPythonEngineTest.java       | 172 ++++++
 .../apex/malhar/python/test/BasePythonTest.java    |  38 ++
 .../malhar/python/test/JepPythonTestContext.java   |  37 ++
 .../python/test/PythonAvailabilityTestRule.java    |  80 +++
 python/src/test/resources/META-INF/properties.xml  |  28 +
 python/src/test/resources/factorial.py             |  49 ++
 python/src/test/resources/log4j.properties         |  34 ++
 44 files changed, 5229 insertions(+)

diff --git a/pom.xml b/pom.xml
index c8925c4..0f62475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,6 +216,7 @@
     <module>contrib</module>
     <module>kafka</module>
     <module>kudu</module>
+    <module>python</module>
     <module>examples</module>
   </modules>
 
diff --git a/python/pom.xml b/python/pom.xml
new file mode 100755
index 0000000..9a2d13f
--- /dev/null
+++ b/python/pom.xml
@@ -0,0 +1,94 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-python</artifactId>
+  <name>Apex library python support</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <jep-version>3.7.0</jep-version>
+    <jepInstallPath>/usr/local/lib/python3.5/site-packages/jep</jepInstallPath> <!-- Purposefully empty as it is ideally set from command line basing on env but
+     avoid errors in IDEs validating a pom.xml which requires a property to be set when referred in the configs-->
+    <disruptor-queue-conversant-media-version>1.2.10</disruptor-queue-conversant-media-version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>black.ninia</groupId>
+      <artifactId>jep</artifactId>
+      <version>${jep-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.conversantmedia</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor-queue-conversant-media-version}</version>
+      <classifier>jdk7</classifier><!-- Set classifier to jdk8 when malhar switches to JDK 8 -->
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>once</forkMode>
+          <argLine>-Djava.library.path=${jepInstallPath}</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
new file mode 100644
index 0000000..6370e32
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/**
+ * Defines the methods that needs to be implemented by the Python Engine implementations. The first implementation
+ *  takes the approach of in-memory interpreter using JEP. Other possibilities are using Py4J which is an inter process
+ *   communication model. An Apex operator would use an instance of the Python engine implementations to run
+ * python code using the chosen engine.
+ */
+public interface ApexPythonEngine
+{
+  /***
+   * Used to perform any pre interpreter processing.
+   * @param preInitConfigs The configuration that is going to be used by the interpreter
+   * @throws ApexPythonInterpreterException if there is an issue in executing the pre interpreter logic
+   */
+  void preInitInterpreter(Map<PythonInterpreterConfig,Object> preInitConfigs) throws ApexPythonInterpreterException;
+
+  /***
+   * Starts the interpreter.
+   * @throws ApexPythonInterpreterException if library not locatable or any other issue starting the interpreter
+   */
+  void startInterpreter() throws ApexPythonInterpreterException;
+
+  /***
+   * Used to perform any logic that needs to be executed after the interpreter is started but before any tuples start
+   *  getting processed. Example, setting the starting state of the variables that are used in tuple processing.
+   * @throws ApexPythonInterpreterException
+   */
+  void postStartInterpreter() throws ApexPythonInterpreterException;
+
+  /***
+   * Runs a series of commands. The implementation engine could make use of a worker pool to execute the command.
+   * @param executionMode Whether these commands need to be run on all worker thread or any of the worker thread.Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param request Represents the request to be processed.
+   * @return A map with key as the command run and boolean as the value. True represents that the command successfully
+   *  run.
+   * @throws ApexPythonInterpreterException if interrupted or if the command cannot be executed
+   */
+  Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode, long windowId, long requestId,
+      PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+  /***
+   * Executes a method call
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param req Represents the request to be processed.
+   * @param <T>
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
+
+  /***
+   * Executes a script that is locatable via a file path
+   * @param executionMode  If the method call needs to be invoked on all workers or any single worker. Please see
+   *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param request Represents the request to be processed.
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  Map<String,PythonRequestResponse<Void>>  executeScript(WorkerExecutionMode executionMode,long windowId,long requestId,
+      PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+  /***
+   * Evaluates a string as a python expression and also supports passing in variables from JVM to the python interpreter
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param req Represents the request to be processed.
+   * @param <T> Java templating signature
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode, long windowId, long requestId,
+      PythonInterpreterRequest<T> req)  throws ApexPythonInterpreterException;
+
+  /***
+   * @return The queue that holds all of the straggler responses.
+   */
+  BlockingQueue<PythonRequestResponse> getDelayedResponseQueue();
+
+  void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue);
+
+  /***
+   * @return The number of times the engine could not process a request as there were no free worker threads and hence
+   *  had to return null
+   */
+  long getNumStarvedReturns();
+
+
+  void setNumStarvedReturns(long numStarvedReturns);
+
+  /**
+   * Returns all of the commands that were executed on all of the worker nodes.
+   * @return History of all commands executed in sequence
+   */
+  List<PythonRequestResponse> getCommandHistory();
+
+  void setCommandHistory(List<PythonRequestResponse> commandHistory);
+
+  void stopInterpreter() throws ApexPythonInterpreterException;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
new file mode 100644
index 0000000..be64d3e
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+/**
+ * An exception used to denote issues when using the ApexPython Engine.
+ */
+public class ApexPythonInterpreterException extends RuntimeException
+{
+  public ApexPythonInterpreterException(Throwable cause)
+  {
+    super(cause);
+  }
+
+  public ApexPythonInterpreterException(String message)
+  {
+    super(message);
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
new file mode 100644
index 0000000..cfc2056
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.InterpreterThread;
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.partitioner.AbstractPythonExecutionPartitioner;
+import org.apache.apex.malhar.python.base.partitioner.PythonExecutionPartitionerType;
+import org.apache.apex.malhar.python.base.partitioner.ThreadStarvationBasedPartitioner;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/***
+ * <p>The operator provides a mechanism to execute arbitrary python code by using the {@link JepPythonEngine} as its
+ *  default engine.</p>
+ *
+ * <p>See {@link JepPythonEngine} and {@link InterpreterThread} for more detailed javadocs about the interpreter
+ *  itself and the API patterns possible through this engine</p>
+ *
+ * <p>Note that the JVM option of the library path needs to be set to a value which contains the JEP dynamic library.
+ *  For example, -Djava.library.path=/usr/local/lib/python3.5/site-packages/jep assuming that the JEP library is
+ *   installed at the above location
+ * </p>
+ *
+ * <p>If using CPython libraries which involve global variables, please use the
+ *  {@link PythonInterpreterConfig#PYTHON_SHARED_LIBS} as one of the keys to specify this library as a shared library.
+ *  Note that the interpreter configs are passed via {@link ApexPythonEngine#preInitInterpreter(Map)} method.
+ *  Overrriding the {@link BasePythonExecutionOperator#getPreInitConfigurations()} and specifying the configs can
+ *   help in specifying the shared libraries that are loaded by all the interpreter threads accordingly.</p>
+ *
+ * <p>The operator comes with the following limitations
+ * <ol>
+ *   <li>Cannot be used in THREAD LOCAL MODE where the downstream operator is using a different version of the
+ *    the python interpreter</li>
+ *   <li>In certain use cases the operator cannot be used CONTAINER LOCAL MODE when there are global defs in the
+ *    CPython library that is being used and the downstream operator depends on those globals ( even though downstream
+ *     is using the same python version of the upstream operator</li>
+ *   <li>Only CPython libraries are supported and uses the JNI mechanisms to use the CPython libraries</li>
+ *   <li>Complex data types cannot be automatically translated to the Python interpreter space. Current
+ *    workaround involves copying the basic types and build the complex type using python code and functionality
+ *     provided by the {@link JepPythonEngine} </li>
+ *   <li>Numpy arrays need to be specified as {@link NDimensionalArray} and the engine automatically translates them
+ *    to a Numpy array. See {@link NDimensionalArray#toNDArray()} for more details</li>
+ * </ol>
+ * </p>
+ *  <p>
+ *    Shared libraries enable sharing of global modules across interpreter workers in a work pool.
+ *     The following snippet of code illustrated the registering of numpy as a shared module.
+ *     <code>
+ *       Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ *       Set<String> sharedLibsList = new HashSet<>();
+ *       sharedLibsList.add("numpy");
+ *       preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+ *     </code>
+ *     Passing the above config in overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()} will help
+ *      in using modules like numpy which have global shared variables.
+ *  </p>
+ *  <p>
+ *    If there are errors while running the operator code with exceptions as module 'jep' has no attribute 'java_import_hook'
+ *    This essentially is generated when JEP is not able to load a library that the python code is using. Ensure
+ *    that the PYTHONPATH environment variable is set pointing to the right location. This is especially
+ *     required when there are multiple python versions in the code.
+ *  </p>
+ *
+ * <p>
+ *     Java 9 is not yet supported. Support only exists for Java 8 and Java 7 runtimes.
+ * </p>
+ *
+ *  <p>For very low time SLA requirements, it is encouraged to set the spin policy to be busy wait using the
+ *   following configuration snippet for the JEP engine
+ *   <code>
+ *       Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ *       preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, ""+ SpinPolicy.BUSY_SPIN.name());
+ *   </code>
+ *   This mapping can be set by overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()}. For more
+ *    details refer {@link SpinPolicy}.
+ *  </p>
+ * @param <T> Represents the incoming tuple.
+ */
+public class BasePythonExecutionOperator<T> extends BaseOperator implements
+    Operator.ActivationListener<Context.OperatorContext>, Partitioner<BasePythonExecutionOperator>,
+    Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(BasePythonExecutionOperator.class);
+
+  /*  a rolling counter for all requests in the current window */
+  protected transient long requestCounterForThisWindow = 0;
+
+  protected transient long responseCounterForThisWindow = 0;
+
+  /***
+   * Blocks the operator thread in endWindow until all of the tuples for the window are successfully emitted. Note
+   *  that these might be emitted from the stragglers port as well.
+   */
+  protected boolean blockAtEndOfWindowForStragglers = false;
+
+  protected transient long currentWindowId = 0;
+
+  private long numberOfRequestsProcessedPerCheckpoint = 0;
+
+  /*** Configuration used to decide if a new operator needs to be spawned while dynamic partitioning planning is taking
+   place. This represents the threshold for percent number of times the requests were starved when no threads were
+    available to process a request. See {@link ThreadStarvationBasedPartitioner}
+   */
+  private float starvationPercentBeforeSpawningNewInstance = 30;
+
+  private transient ApexPythonEngine apexPythonEngine;
+
+  /*** number of workers in the worker pool */
+  private int workerThreadPoolSize = 3;
+
+  /** Can be used to set the additional File system paths that the interpreter can use to locate modules. Use
+   *  commas as separator when representing a collection of strings */
+  private String interpreterEngineIncludePaths;
+
+  /** Can be used to add shared modules that an interpreter engine can load if working within JVM memory space. Use
+   *  commas as separator when representing a collection of strings */
+  private String interpreterEngineSharedLibNames;
+
+  /** Used to represent the behaviour of the interpreter thread if no requests are found in its queue. Used to tune
+   *  for extremely sensitive SLAs for computation. See {@link SpinPolicy} for possible string values representation */
+  private String idleInterpreterSpinPolicy = SpinPolicy.SLEEP.name();
+
+  /** Used to represent the behaviour of the disruptor queue that processes request response dispatches. Tune this
+   *  to manage the low latency behaviors. See {@link SpinPolicy} for possible string value representation */
+  private String requestQueueWaitSpinPolicy = SpinPolicy.SLEEP.name();
+
+  private int sleepTimeInMSInCaseOfNoRequests = 5;
+
+
+  /*** Time for which the python engine will sleep to allow for the interpreter will sleep before worker can be used
+   *  for executing work requests.
+   */
+  private long sleepTimeDuringInterpreterBoot = 2000L;
+
+  private PythonExecutionPartitionerType partitionerType = PythonExecutionPartitionerType.THREAD_STARVATION_BASED;
+
+  private transient AbstractPythonExecutionPartitioner partitioner;
+
+  /*** port into which the stragglers will be emitted */
+  public final transient DefaultOutputPort<PythonRequestResponse> stragglersPort =
+      new com.datatorrent.api.DefaultOutputPort<>();
+
+  /*** Port into which the normal execution path will push the results into */
+  public final transient DefaultOutputPort<PythonRequestResponse> outputPort =
+      new com.datatorrent.api.DefaultOutputPort<>();
+
+  /*** Port into which all error tuples will be emitted into */
+  public final transient DefaultOutputPort<T> errorPort = new com.datatorrent.api.DefaultOutputPort<>();
+
+  private Object objectForLocking = new Object();
+
+  /*** A counter that is used to track how many times a request could not be serviced within a given window. Used
+   by the {@link ThreadStarvationBasedPartitioner} to spawn a new instance of the operator based on a configuration
+   threshold
+   */
+  @AutoMetric
+  private long numStarvedReturnsPerCheckpoint = 0;
+
+  @AutoMetric
+  private long numNullResponsesPerWindow = 0;
+
+  /***
+   * Represents all python commands that need to be run on a new instance of the operator after dynamic partitioning
+   */
+  private List<PythonRequestResponse> accumulatedCommandHistory = new ArrayList<>();
+
+  /***
+   * Processes the incoming tuple using the python engine that is injected. Also emits stragglers if any.
+   */
+  @InputPortFieldAnnotation
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      numberOfRequestsProcessedPerCheckpoint += 1;
+      requestCounterForThisWindow += 1;
+      emitStragglers();
+      try {
+        PythonRequestResponse result = processPythonCodeForIncomingTuple(tuple,getApexPythonEngine());
+        if ( result != null) {
+          responseCounterForThisWindow += 1;
+          outputPort.emit(result);
+        } else {
+          numNullResponsesPerWindow += 1;
+        }
+      } catch (ApexPythonInterpreterException e) {
+        responseCounterForThisWindow += 1; // Error tuples need to be accounted for totals
+        errorPort.emit(tuple);
+        LOG.debug("Error while processing tuple", e);
+      }
+    }
+  };
+
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    getApexPythonEngine().setNumStarvedReturns(0L);
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  /***
+   * Used to emit the stragglers into the Stragglers port. Invoked either when a new tuple arrives or when Idle time
+   *  is detected on this operator.
+   */
+  private void emitStragglers()
+  {
+    LOG.debug("Emitting stragglers");
+    List<PythonRequestResponse> stragglerResponse = new ArrayList<>();
+    getApexPythonEngine().getDelayedResponseQueue().drainTo(stragglerResponse);
+    for (PythonRequestResponse aReqResponse : stragglerResponse) {
+      responseCounterForThisWindow += 1;
+      stragglersPort.emit(aReqResponse);
+    }
+  }
+
+  /***
+   * Instantiates the configured python engine. Only in-memory implementation provided for now
+   * @param context The operator context
+   * @return The python engine
+   */
+  protected ApexPythonEngine initApexPythonEngineImpl(Context.OperatorContext context)
+  {
+    JepPythonEngine jepPythonEngine = new JepPythonEngine("" + context.getId(),workerThreadPoolSize);
+    jepPythonEngine.setSleepTimeAfterInterpreterStart(getSleepTimeDuringInterpreterBoot());
+    LOG.debug("Initialized Python engine " + jepPythonEngine);
+    return jepPythonEngine;
+  }
+
+  /***
+   * Instantiates the partitioner. Only Thread Starvation based partitioner for now.
+   */
+  private void initPartitioner()
+  {
+    if (partitioner == null) {
+      synchronized (objectForLocking) {
+        if (partitioner == null) {
+          switch (partitionerType) {
+            default:
+            case THREAD_STARVATION_BASED:
+              partitioner = new ThreadStarvationBasedPartitioner(this);
+              break;
+          }
+        }
+      }
+    }
+  }
+
+  /***
+   * Starts the python engine and also sleeps for sometime ( configurable) to ensure that the interpreter is
+   *  completely booted up in memory. The time taken to boot the interpreter depends on the libraries that are loaded etc
+   *  and hence tune the sleeptimeToBoot parameter accordingly.
+   * @param context The Operator context
+   */
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    LOG.debug("Initializing the python interpreter setup");
+    apexPythonEngine = initApexPythonEngineImpl(context);
+    Map<PythonInterpreterConfig,Object> preInitConfigurations = getPreInitConfigurations();
+    apexPythonEngine.preInitInterpreter(preInitConfigurations);
+    apexPythonEngine.startInterpreter();
+    LOG.debug("Python interpreter started. Now invoking post interpreter logic");
+    processPostSetUpPythonInstructions(apexPythonEngine);
+    LOG.debug("Python post interpreter logic complete");
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    apexPythonEngine.stopInterpreter();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    requestCounterForThisWindow = 0;
+    responseCounterForThisWindow = 0;
+    numNullResponsesPerWindow = 0;
+    currentWindowId = windowId;
+  }
+
+  /***
+   * if blockAtEndOfWindowForStragglers is configured to be true ( via a setter or config ), then end window loops
+   *  until all the stragglers are emitted. If this configuration element is set to false, then there is no loop
+   *  waiting for the stragglers to be completed emitting. Note that there is a sleep in case the configuration
+   *  blockAtEndOfWindowForStragglers value is set to true between each bulk emit of all stragglers that arrived at that
+   *   point in time.
+   */
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    if (responseCounterForThisWindow < requestCounterForThisWindow) {
+      LOG.debug("Detected stragglers and configured flag/state for blocking at end of window is " +
+          blockAtEndOfWindowForStragglers);
+      if (blockAtEndOfWindowForStragglers) {
+        LOG.debug("Trying to emit all stragglers before the next window can be processed");
+        while (responseCounterForThisWindow < requestCounterForThisWindow) {
+          emitStragglers();
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            LOG.debug("Thread interrupted while trying to emit all stragglers");
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    initPartitioner();
+    return partitioner.definePartitions(partitions,context);
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+  {
+    initPartitioner();
+    partitioner.partitioned(partitions);
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    accumulatedCommandHistory.clear();
+    accumulatedCommandHistory.addAll(getApexPythonEngine().getCommandHistory());
+    numStarvedReturnsPerCheckpoint = getApexPythonEngine().getNumStarvedReturns();
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    getApexPythonEngine().setNumStarvedReturns(0L);
+    numberOfRequestsProcessedPerCheckpoint = 0;
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
+  /***
+   * Override this to hook any logic that would be needed immediately after setup of the interpreter. Some of the
+   * actions include setting up an interpreter state which can be used across all tuple invocations
+   * @param pythonEngine
+   * @throws ApexPythonInterpreterException
+   */
+  public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngine) throws ApexPythonInterpreterException
+  {
+    apexPythonEngine.postStartInterpreter();
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    emitStragglers();
+  }
+
+  public ApexPythonEngine getApexPythonEngine()
+  {
+    return apexPythonEngine;
+  }
+
+  public void setApexPythonEngine(ApexPythonEngine apexPythonEngine)
+  {
+    this.apexPythonEngine = apexPythonEngine;
+  }
+
+  /***
+   * Override this method to perform any meaningful work in python interpreter space.
+   * @param input The incoming tuple
+   * @param pythonEngineRef The impementation of the Python interpreter that can be used to execute python commands
+   * @return The result of execution wrapped as a PythonRequestResponse Object
+   * @throws ApexPythonInterpreterException if interrupted or no workers avaialble to execute the python code.
+   */
+  public PythonRequestResponse processPythonCodeForIncomingTuple(T input, ApexPythonEngine pythonEngineRef)
+    throws ApexPythonInterpreterException
+  {
+    return null;
+  }
+
+  /***
+   * See constants defined in {@link PythonInterpreterConfig} for a list of keys available. Application properties xml
+   *  file can be used to set the values for the interpreter configuration. Override this method
+   *  to refine the configuration that is being used to start the interpreter instance. Please see test
+   *   application implemented in the test code for example usage.
+   * @return
+   */
+  public Map<PythonInterpreterConfig,Object> getPreInitConfigurations()
+  {
+    Map<PythonInterpreterConfig,Object> configsForInterpreter = new HashMap<>();
+    String includePaths = getInterpreterEngineIncludePaths();
+    if (includePaths != null) {
+      List<String> interpreterIncludePaths = new ArrayList<>();
+      interpreterIncludePaths.addAll(Arrays.asList(includePaths.split(",")));
+      configsForInterpreter.put(PythonInterpreterConfig.PYTHON_INCLUDE_PATHS, interpreterIncludePaths);
+    }
+    String sharedLibs = getInterpreterEngineSharedLibNames();
+    if (sharedLibs != null) {
+      List<String> sharedLibNames = new ArrayList<>();
+      sharedLibNames.addAll(Arrays.asList(sharedLibs.split(",")));
+      configsForInterpreter.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibNames);
+    }
+    configsForInterpreter.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, getIdleInterpreterSpinPolicy());
+    configsForInterpreter.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY, getRequestQueueWaitSpinPolicy());
+    configsForInterpreter.put(PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS,
+        getSleepTimeInMSInCaseOfNoRequests());
+    return configsForInterpreter;
+  }
+
+  public long getSleepTimeDuringInterpreterBoot()
+  {
+    return sleepTimeDuringInterpreterBoot;
+  }
+
+  public void setSleepTimeDuringInterpreterBoot(long sleepTimeDuringInterpreterBoot)
+  {
+    this.sleepTimeDuringInterpreterBoot = sleepTimeDuringInterpreterBoot;
+  }
+
+  public int getWorkerThreadPoolSize()
+  {
+    return workerThreadPoolSize;
+  }
+
+  public void setWorkerThreadPoolSize(int workerThreadPoolSize)
+  {
+    this.workerThreadPoolSize = workerThreadPoolSize;
+  }
+
+  public PythonExecutionPartitionerType getPartitionerType()
+  {
+    return partitionerType;
+  }
+
+  public void setPartitionerType(PythonExecutionPartitionerType partitionerType)
+  {
+    this.partitionerType = partitionerType;
+  }
+
+  public long getNumberOfRequestsProcessedPerCheckpoint()
+  {
+    return numberOfRequestsProcessedPerCheckpoint;
+  }
+
+  public void setNumberOfRequestsProcessedPerCheckpoint(long numberOfRequestsProcessedPerCheckpoint)
+  {
+    this.numberOfRequestsProcessedPerCheckpoint = numberOfRequestsProcessedPerCheckpoint;
+  }
+
+  public AbstractPythonExecutionPartitioner getPartitioner()
+  {
+    return partitioner;
+  }
+
+  public void initPartitioner(AbstractPythonExecutionPartitioner partitioner)
+  {
+    this.partitioner = partitioner;
+  }
+
+  public float getStarvationPercentBeforeSpawningNewInstance()
+  {
+    return starvationPercentBeforeSpawningNewInstance;
+  }
+
+  public void setStarvationPercentBeforeSpawningNewInstance(float starvationPercentBeforeSpawningNewInstance)
+  {
+    this.starvationPercentBeforeSpawningNewInstance = starvationPercentBeforeSpawningNewInstance;
+  }
+
+  public long getNumStarvedReturns()
+  {
+    return numStarvedReturnsPerCheckpoint;
+  }
+
+  public void setNumStarvedReturns(long numStarvedReturns)
+  {
+    this.numStarvedReturnsPerCheckpoint = numStarvedReturns;
+  }
+
+  public List<PythonRequestResponse> getAccumulatedCommandHistory()
+  {
+    return accumulatedCommandHistory;
+  }
+
+  public void setAccumulatedCommandHistory(List<PythonRequestResponse> accumulatedCommandHistory)
+  {
+    this.accumulatedCommandHistory = accumulatedCommandHistory;
+  }
+
+  public boolean isBlockAtEndOfWindowForStragglers()
+  {
+    return blockAtEndOfWindowForStragglers;
+  }
+
+  public void setBlockAtEndOfWindowForStragglers(boolean blockAtEndOfWindowForStragglers)
+  {
+    this.blockAtEndOfWindowForStragglers = blockAtEndOfWindowForStragglers;
+  }
+
+  public String getInterpreterEngineIncludePaths()
+  {
+    return interpreterEngineIncludePaths;
+  }
+
+  public void setInterpreterEngineIncludePaths(String interpreterEngineIncludePaths)
+  {
+    this.interpreterEngineIncludePaths = interpreterEngineIncludePaths;
+  }
+
+  public String getInterpreterEngineSharedLibNames()
+  {
+    return interpreterEngineSharedLibNames;
+  }
+
+  public void setInterpreterEngineSharedLibNames(String interpreterEngineSharedLibNames)
+  {
+    this.interpreterEngineSharedLibNames = interpreterEngineSharedLibNames;
+  }
+
+  public String getIdleInterpreterSpinPolicy()
+  {
+    return idleInterpreterSpinPolicy;
+  }
+
+  public void setIdleInterpreterSpinPolicy(String idleInterpreterSpinPolicy)
+  {
+    this.idleInterpreterSpinPolicy = idleInterpreterSpinPolicy;
+  }
+
+  public String getRequestQueueWaitSpinPolicy()
+  {
+    return requestQueueWaitSpinPolicy;
+  }
+
+  public void setRequestQueueWaitSpinPolicy(String requestQueueWaitSpinPolicy)
+  {
+    this.requestQueueWaitSpinPolicy = requestQueueWaitSpinPolicy;
+  }
+
+  public int getSleepTimeInMSInCaseOfNoRequests()
+  {
+    return sleepTimeInMSInCaseOfNoRequests;
+  }
+
+  public void setSleepTimeInMSInCaseOfNoRequests(int sleepTimeInMSInCaseOfNoRequests)
+  {
+    this.sleepTimeInMSInCaseOfNoRequests = sleepTimeInMSInCaseOfNoRequests;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
new file mode 100644
index 0000000..f904d98
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+
+/***
+ * Used as key configs while passing the pre interpreter configuration. See
+ *  {@link JepPythonEngine#preInitInterpreter(Map)}
+ */
+public enum PythonInterpreterConfig
+{
+  PYTHON_INCLUDE_PATHS,
+  PYTHON_SHARED_LIBS,
+  IDLE_INTERPRETER_SPIN_POLICY,
+  REQUEST_QUEUE_WAIT_SPIN_POLICY,
+  SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
new file mode 100644
index 0000000..e024ab0
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+
+/***
+ * <p>Used to specify if a given API invocation in the in-memory interpreter is going to be invoked for all the worker
+ *  threads, a sticky thread or just any one thread.
+ *
+ *  - {@link WorkerExecutionMode#BROADCAST} is to be used when the
+ *   command is resulting in a state of the interpreter which has to be used in subsequent calls. For example,
+ *   deserializing a machine learning model can be used as a BROADCAST model as the scoring can then be invoked across
+ *   all worker threads if required.
+ *  - {@link WorkerExecutionMode#ANY} Represents a state wherein any worker can be used to execute the code.
+ *   Example would be scoring an incoming tuple on which the model has already been deserialized across all nodes
+ *  - {@link WorkerExecutionMode#STICKY} Use STICKY if the same worker needs to service the request.
+ *    The downside of this is that it may or may not complete on time and depends on the queue length.</p>
+ *
+ *   <p><b>Ensure the {@link PythonInterpreterRequest#hashCode()} is overridden if STICKY is chosen </b></p>
+ */
+public enum WorkerExecutionMode
+{
+  BROADCAST,
+  ANY,
+  STICKY
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
new file mode 100644
index 0000000..75fa552
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import jep.Jep;
+import jep.JepConfig;
+import jep.JepException;
+import jep.NDArray;
+
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_INCLUDE_PATHS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_SHARED_LIBS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+/**
+ * <p>
+ * Represents a python interpreter instance embedded in JVM memory using the JEP ( Java embedded Python ) engine.
+ * JEP uses JNI wrapper around the embedded Python instance. JEP mandates that the thread that created the JEP
+ * instance is the only thread that can perform method calls on the embedded interpreter. This requires
+ * the Apex operator implementation to decouple JEP execution logic from the operator processing main thread.
+ * <b>Note that this embedded python is an interpreter and this essentially means the state of the interpreter
+ * is maintained across all calls to the interpreter.</b>
+ * The threaded implementation provides the following main functionalities
+ * <ol>
+ *   <li>An evaluation expression that can interpret a string as a python command. The user can also set
+ *    variable values that are
+ *      <ul>
+ *         <li>Transferred to the interpreter with the same variable names</li>
+ *         <li>Garbage collected from the python interpreter space</li>
+ *      </ul>
+ *   </li>
+ *   <li>
+ *     A method call invocation wherein parameters can be sent to the previously defined method (the method must have to
+ *      be defined perhaps via an eval expression or a previous execute script call)
+ *   </li>
+ *   <li>A script call command that can execute a script. There is currently no support to pass params to scripts</li>
+ *   <li>A handy mechanism to execute a series of commands. Note that this is a simple wrapper around the
+ *    eval expression. The main difference here is that there are no user variables substitution used in
+ *     this model. This is useful for statements like import ( ex: import numpy as np ) which does not require
+ *      user variables conversion</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ *   The logic is executed using a request and response queue model. The thread keeps consuming from a request queue
+ *   and submits results to a response queue.
+ * </p>
+ * <p>
+ *   Note that all outputs are being redirected to the standard logger. Hence using statements like print(secret)
+ *   needs to be avoided as the result of the print command is captured in the log.
+ * </p>
+ * <p>
+ *   When using Cpython libraries like numpy, <b>ensure you first register numpy as a shared library</b> before using it
+ *   in even import statements. Not doing so will result in very obscure errors.
+ * </p>
+ */
+
+public class InterpreterThread implements Runnable
+{
+  private static final Logger LOG = LoggerFactory.getLogger(InterpreterThread.class);
+
+  /* Name of the dynamically loaded JNI library */
+  public static final String JEP_LIBRARY_NAME = "jep";
+
+  /* The string command which will be used to delete python variables after they are used. */
+  public static final String PYTHON_DEL_COMMAND = "del ";
+
+  public transient Jep JEP_INSTANCE;
+
+  /* Used by the operator thread or other threads to mark the stopping of processing of the interpreter command loop */
+  private transient volatile boolean isStopped = false;
+
+  /* Used to represent the current state of this thread whether it is currently busy executing a command */
+  private transient volatile boolean busyFlag = false;
+
+  /* Represents the default amount of time that this thread will wait to read a command from the request queue */
+  private long timeOutToPollFromRequestQueue = 1;
+
+  private TimeUnit timeUnitsToPollFromRequestQueue = TimeUnit.MILLISECONDS;
+
+  private transient volatile BlockingQueue<PythonRequestResponse> requestQueue;
+
+  private transient volatile BlockingQueue<PythonRequestResponse> responseQueue;
+
+  /* An id that can be useful while logging statements */
+  private String threadID;
+
+  /* Whether this thread should sleep for a few moments if there are no requests are keep checking the request queue */
+  private SpinPolicy spinPolicy = SpinPolicy.SLEEP;
+
+  /* Holds the configs that are used to initialize the interpreter thread. Examples of config are shared libraries and
+  include paths for the interpreter. The key is one of the constants defined in PythonInterpreterConfig and value
+    is specific to the config type that is being set.
+   */
+  private Map<PythonInterpreterConfig,Object> initConfigs = new HashMap<>();
+
+  /* Used as a flag to denote an error situation in the interpreter so that the next set of commands to run
+   *  an empty/null eval expression to clear any erraneous state  */
+  private boolean errorEncountered = false;
+
+  private long sleepTimeMsInCaseOfNoRequests = 1;
+
+  /***
+   * Constructs an interpreter thread instance. Note that the constructor does not start the interpreter in memory yet.
+   * @param requestQueue The queue from which requests will be processed from.
+   * @param responseQueue The queue into which the responses will be written into
+   * @param threadID An identifier for this thread name for efficient logging markers
+   */
+  public InterpreterThread(BlockingQueue<PythonRequestResponse> requestQueue,
+      BlockingQueue<PythonRequestResponse> responseQueue,String threadID)
+  {
+    this.requestQueue = requestQueue;
+    this.responseQueue = responseQueue;
+    this.threadID = threadID;
+  }
+
+  /***
+   * Loads the JEP dynamic library for the JVM to use the JNI bridge into the interpreter
+   * @throws ApexPythonInterpreterException if the library could not be loaded or located
+   */
+  private void loadMandatoryJVMLibraries() throws ApexPythonInterpreterException
+  {
+    LOG.info("Java library path being used for Interpreted ID " +  threadID + " " +
+        System.getProperty("java.library.path"));
+    try {
+      System.loadLibrary(JEP_LIBRARY_NAME);
+    } catch (Exception e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    LOG.info("JEP library loaded successfully");
+  }
+
+
+  public Jep getEngineReference() throws ApexPythonInterpreterException
+  {
+    return JEP_INSTANCE;
+  }
+
+  /***
+   * Executes the logic required before the start of the interpreter. In this case, it is just registering of the
+   * configs which are to be used when the interpreter is about to load
+   * @param preInitConfigs
+   * @throws ApexPythonInterpreterException
+   */
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+    throws ApexPythonInterpreterException
+  {
+    initConfigs.putAll(preInitConfigs);
+  }
+
+  /***
+   * Starts the interpreter by loading the shared libraries
+   * @throws ApexPythonInterpreterException if the interpreter could not be started
+   */
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    Thread.currentThread().setName(threadID);
+    Thread.currentThread().setPriority(Thread.MAX_PRIORITY); // To allow for time aware calls
+    loadMandatoryJVMLibraries();
+    JepConfig config = new JepConfig()
+        .setRedirectOutputStreams(true)
+        .setInteractive(false)
+        .setClassLoader(Thread.currentThread().getContextClassLoader()
+    );
+    if (initConfigs.containsKey(PYTHON_INCLUDE_PATHS)) {
+      List<String> includePaths = (List<String>)initConfigs.get(PYTHON_INCLUDE_PATHS);
+      if ( includePaths != null) {
+        LOG.info("Adding include path for the in-memory interpreter instance");
+        for (String anIncludePath: includePaths) {
+          config.addIncludePaths(anIncludePath);
+        }
+      }
+    }
+    if (initConfigs.containsKey(PYTHON_SHARED_LIBS)) {
+      Set<String> sharedLibs = (Set<String>)initConfigs.get(PYTHON_SHARED_LIBS);
+      if ( sharedLibs != null) {
+        config.setSharedModules(sharedLibs);
+        LOG.info("Loaded " + sharedLibs.size() + " shared libraries as config");
+      }
+    } else {
+      LOG.info(" No shared libraries loaded");
+    }
+    if (initConfigs.containsKey(IDLE_INTERPRETER_SPIN_POLICY)) {
+      spinPolicy = SpinPolicy.valueOf((String)initConfigs.get(IDLE_INTERPRETER_SPIN_POLICY));
+      LOG.debug("Configuring spin policy to be " + spinPolicy);
+    }
+    if (initConfigs.containsKey(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS)) {
+      sleepTimeMsInCaseOfNoRequests = (Long)initConfigs.get(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS);
+      LOG.debug("Configuring sleep time for no requests situation to be " + sleepTimeMsInCaseOfNoRequests);
+    }
+    try {
+      LOG.info("Launching the in-memory interpreter");
+      JEP_INSTANCE = new Jep(config);
+    } catch (JepException e) {
+      LOG.error(e.getMessage(),e); // Purposefully logging as this will help in startup issues being captured inline
+      throw new ApexPythonInterpreterException(e);
+    }
+  }
+
+  /***
+   * Runs a series of interpreter commands. Note that no params can be passed from the JVM to the python interpreter
+   * space
+   * @param commands The series of commands that will be executed sequentially
+   * @return A map containing the result of execution of each of the commands. The command is the key that was
+   * passed as input and the value is a boolean whether the command was executed successfully
+   */
+  private Map<String,Boolean> runCommands(List<String> commands)
+  {
+    LOG.debug("Executing run commands");
+    Map<String,Boolean> resultsOfExecution = new HashMap<>();
+    for (String aCommand : commands) {
+      LOG.debug("Executing command " + aCommand);
+      try {
+        resultsOfExecution.put(aCommand,JEP_INSTANCE.eval(aCommand));
+      } catch (JepException e) {
+        resultsOfExecution.put(aCommand,Boolean.FALSE);
+        errorEncountered = true;
+        LOG.error("Error while running command " + aCommand, e);
+        return resultsOfExecution;
+      }
+    }
+    return resultsOfExecution;
+  }
+
+  /***
+   * Executes a method call by passing any parameters to the method call. The params are passed in the order they are
+   *  set in the list.
+   * @param nameOfGlobalMethod Name of the method to invoke
+   * @param argsToGlobalMethod Arguments to the method call. Typecasting is interpreted at runtime and hence multiple
+   *                           types can be sent as part of the parameter list
+   * @param type The class of the return parameter. Note that in some cases the return type will be the highest possible
+   *             bit size. For example addition of tow ints passed in might return a Long by the interpreter.
+   * @param <T> Represents the type of the return parameter
+   * @return The response from the method call that the python method returned
+   */
+  private <T> T executeMethodCall(String nameOfGlobalMethod, List<Object> argsToGlobalMethod, Class<T> type)
+  {
+    LOG.debug("Executing method call invocation");
+    try {
+      if ((argsToGlobalMethod != null) && (argsToGlobalMethod.size() > 0)) {
+        List<Object> paramsToPass = argsToGlobalMethod;
+        List<Object> modifiedParams = new ArrayList<>();
+        for ( Object aMethodParam: argsToGlobalMethod) {
+          if (argsToGlobalMethod.get(0) instanceof NDimensionalArray) {
+            LOG.debug(aMethodParam + " is of type NDimensional array and hence converting to JEP NDArray");
+            modifiedParams.add(((NDimensionalArray)aMethodParam).toNDArray());
+          } else {
+            modifiedParams.add(aMethodParam);
+          }
+        }
+        LOG.debug("Executing method" + nameOfGlobalMethod + " with " + modifiedParams.size() + " parameters");
+        return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,modifiedParams.toArray()));
+      } else {
+        LOG.debug("Executing " + argsToGlobalMethod + " with no parameters");
+        return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,new ArrayList<>().toArray()));
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while executing method " + nameOfGlobalMethod, e);
+    }
+    return null;
+  }
+
+  /***
+   * Executes a python script which can be located in the path
+   * @param pathToScript The path to the script
+   * @return true if the script invocation was successfull or false otherwise
+   */
+  private boolean executeScript(String pathToScript)
+  {
+    LOG.debug("Executing script at path " + pathToScript);
+    try {
+      JEP_INSTANCE.runScript(pathToScript);
+      return true;
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error(" Error while executing script " + pathToScript, e);
+    }
+    return false;
+  }
+
+  /***
+   * Evaluates a string expression by passing in any variable subsitution into the Interpreter space if required. Also
+   * handles the garbage collection of the variables passed and offers a configurable way to delete any variable created
+   *  as part of the evaluation expression.
+   * @param command The string equivalent of the command
+   * @param variableToExtract The name of the variable that would need to be extracted from the python interpreter space
+   *                          to the JVM space.
+   * @param variableSubstituionParams Key value pairs representing the variables that need to be passed into the
+   *                                  interpreter space and are part of the eval expression.
+   * @param deleteExtractedVariable if the L.H.S. of an assignment expression variable needs to be deleted. This is
+   *                                essentially the variable that is being requested to extract i.e. the second
+   *                                parameter to this method.
+   * @param expectedReturnType Class representing the expected return type
+   * @param <T> Template signature for the expected return type
+   * @return The value that is extracted from the interpreter space ( possibly created as part of the eval expression or
+   *  otherwise ). Returns null if any error
+   */
+  private <T> T eval(String command, String variableToExtract, Map<String, Object> variableSubstituionParams,
+      boolean deleteExtractedVariable,Class<T> expectedReturnType)
+  {
+    T variableToReturn = null;
+    LOG.debug("Executing eval expression " + command + " with return type : " + expectedReturnType);
+    try {
+      for (String aKey : variableSubstituionParams.keySet()) {
+        Object keyVal = variableSubstituionParams.get(aKey);
+        if (keyVal instanceof NDimensionalArray) {
+          keyVal = ((NDimensionalArray)keyVal).toNDArray();
+        }
+        JEP_INSTANCE.set(aKey, keyVal);
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while setting the params for eval expression " + command, e);
+      return null;
+    }
+    try {
+      LOG.debug("Executing the eval expression in the interpreter instance " + command);
+      JEP_INSTANCE.eval(command);
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while evaluating the expression " + command, e);
+      return null;
+    }
+    try {
+      if (variableToExtract != null) {
+        Object extractedVariable = JEP_INSTANCE.getValue(variableToExtract);
+        if (extractedVariable instanceof NDArray) {
+          LOG.debug(" Return type is a NumPy Array. Hence converting to NDimensionalArray instance");
+          NDArray ndArrayJepVal = (NDArray)extractedVariable;
+          NDimensionalArray nDimArray = new NDimensionalArray();
+          nDimArray.setData(ndArrayJepVal.getData());
+          nDimArray.setSignedFlag(ndArrayJepVal.isUnsigned());
+          int[] dimensions = ndArrayJepVal.getDimensions();
+          nDimArray.setDimensions(dimensions);
+          int lengthInOneDimension = 1;
+          for ( int i = 0; i < dimensions.length; i++) {
+            lengthInOneDimension *= dimensions[i];
+          }
+          nDimArray.setLengthOfSequentialArray(lengthInOneDimension);
+          variableToReturn = expectedReturnType.cast(nDimArray);
+        } else {
+          variableToReturn =  expectedReturnType.cast(extractedVariable);
+        }
+        if (deleteExtractedVariable) {
+          LOG.debug("Deleting the extracted variable from the Python interpreter space");
+          JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + variableToExtract);
+        }
+      }
+      LOG.debug("Deleting all the variables from the python interpreter space ");
+      for (String aKey: variableSubstituionParams.keySet()) {
+        LOG.debug("Deleting " + aKey);
+        JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + aKey);
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while evaluating delete part of expression " + command, e);
+      return null;
+    }
+    return variableToReturn;
+  }
+
+  /***
+   * Stops the interpreter as requested from the operator/main thread
+   * @throws ApexPythonInterpreterException if not able to stop the
+   */
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    isStopped = true;
+    LOG.info("Attempting to close the interpreter thread");
+    try {
+      JEP_INSTANCE.close();
+    } catch (Exception e) {
+      LOG.error("Error while stopping the interpreter thread ", e);
+      throw new ApexPythonInterpreterException(e);
+    }
+    LOG.info("Interpreter closed");
+  }
+
+  /***
+   * Responsible for polling the request queue and formatting the request payload to make it compatible to the
+   *  individual processing logic of the functionalities provided by the interpreter API methods.
+   * @param <T> Java templating signature enforcement
+   * @throws ApexPythonInterpreterException if an unrecognized command is issued.
+   * @throws InterruptedException if interrupted while trying to wait for a request from request queue
+   */
+  private <T> void processCommand() throws ApexPythonInterpreterException, InterruptedException
+  {
+
+    PythonRequestResponse requestResponseHandle = requestQueue.poll(timeOutToPollFromRequestQueue,
+        timeUnitsToPollFromRequestQueue);
+    if (requestResponseHandle != null) {
+      LOG.debug("Processing command " + requestResponseHandle.getPythonInterpreterRequest().getCommandType());
+      busyFlag = true;
+      if (errorEncountered) {
+        LOG.debug("Error state detected from a previous command. Resetting state to  the previous" +
+            " state of the error");
+        try {
+          JEP_INSTANCE.eval(null);
+          errorEncountered = false;
+        } catch (JepException e) {
+          LOG.error("Error while trying to clear the state of the interpreter due to previous command" +
+              " " + e.getMessage(), e);
+        }
+      }
+      PythonInterpreterRequest<T> request =
+          requestResponseHandle.getPythonInterpreterRequest();
+      PythonInterpreterResponse<T> response =
+          requestResponseHandle.getPythonInterpreterResponse();
+      Map<String,Boolean> commandStatus = new HashMap<>(1);
+      switch (request.getCommandType()) {
+        case EVAL_COMMAND:
+          EvalCommandRequestPayload evalPayload = request.getEvalCommandRequestPayload();
+          T responseVal = eval(evalPayload.getEvalCommand(), evalPayload.getVariableNameToExtractInEvalCall(),
+              evalPayload.getParamsForEvalCommand(), evalPayload.isDeleteVariableAfterEvalCall(),
+              request.getExpectedReturnType());
+          response.setResponse(responseVal);
+          if (responseVal != null) {
+            commandStatus.put(evalPayload.getEvalCommand(),Boolean.TRUE);
+          } else {
+            commandStatus.put(evalPayload.getEvalCommand(),Boolean.FALSE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case SCRIPT_COMMAND:
+          ScriptExecutionRequestPayload scriptPayload = request.getScriptExecutionRequestPayload();
+          if (executeScript(scriptPayload.getScriptName())) {
+            commandStatus.put(scriptPayload.getScriptName(),Boolean.TRUE);
+          } else {
+            commandStatus.put(scriptPayload.getScriptName(),Boolean.FALSE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case METHOD_INVOCATION_COMMAND:
+          MethodCallRequestPayload requestpayload = request.getMethodCallRequest();
+          response.setResponse(executeMethodCall(
+              requestpayload.getNameOfMethod(), requestpayload.getArgs(), request.getExpectedReturnType()));
+          if (response.getResponse() == null) {
+            commandStatus.put(requestpayload.getNameOfMethod(), Boolean.FALSE);
+          } else {
+            commandStatus.put(requestpayload.getNameOfMethod(), Boolean.TRUE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case GENERIC_COMMANDS:
+          response.setCommandStatus(runCommands(request.getGenericCommandsRequestPayload().getGenericCommands()));
+          break;
+        default:
+          throw new ApexPythonInterpreterException(new Exception("Unspecified Interpreter command"));
+      }
+      requestResponseHandle.setRequestCompletionTime(System.currentTimeMillis());
+      responseQueue.put(requestResponseHandle);
+      LOG.debug("Submitted the response and executed " + response.getCommandStatus().size() + " instances of command");
+    }
+    busyFlag = false;
+  }
+
+  /***
+   * Starts the interpreter as soon as the thread starts running. This is due to the limitation of JEP which stipulates
+   *  that the thread which started the interpreter can only issue subsequent calls/invocations. This is due to JNI
+   *   limitations. The thread then tries to consume from the request queue and process them. If there are no requests
+   *    present then the thread can possibly go to sleep based on the {@link SpinPolicy} configured. The spin policy
+   *     is passed in as the pre init configurations. See {@link PythonInterpreterConfig} for more details
+   */
+  @Override
+  public void run()
+  {
+    LOG.info("Starting the execution of Interpreter thread ");
+    if (JEP_INSTANCE == null) {
+      LOG.info("Initializaing the interpreter state");
+      startInterpreter();
+      LOG.info("Successfully initialized the interpreter");
+    }
+    while (!isStopped) {
+      if ( (requestQueue.isEmpty()) && (spinPolicy == SpinPolicy.SLEEP)) {
+        LOG.debug("Sleeping the current thread as there are no more requests to process from the queue");
+        try {
+          Thread.sleep(sleepTimeMsInCaseOfNoRequests);
+          continue;
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      try {
+        processCommand();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    LOG.info("Stop condition detected for this thread. Stopping the in-memory interpreter now...");
+    stopInterpreter();
+  }
+
+  public Jep getJEP_INSTANCE()
+  {
+    return JEP_INSTANCE;
+  }
+
+  public void setJEP_INSTANCE(Jep JEP_INSTANCE)
+  {
+    this.JEP_INSTANCE = JEP_INSTANCE;
+  }
+
+  public long getTimeOutToPollFromRequestQueue()
+  {
+    return timeOutToPollFromRequestQueue;
+  }
+
+  public void setTimeOutToPollFromRequestQueue(long timeOutToPollFromRequestQueue)
+  {
+    this.timeOutToPollFromRequestQueue = timeOutToPollFromRequestQueue;
+  }
+
+  public TimeUnit getTimeUnitsToPollFromRequestQueue()
+  {
+    return timeUnitsToPollFromRequestQueue;
+  }
+
+  public void setTimeUnitsToPollFromRequestQueue(TimeUnit timeUnitsToPollFromRequestQueue)
+  {
+    this.timeUnitsToPollFromRequestQueue = timeUnitsToPollFromRequestQueue;
+  }
+
+  public boolean isStopped()
+  {
+    return isStopped;
+  }
+
+  public void setStopped(boolean stopped)
+  {
+    isStopped = stopped;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getRequestQueue()
+  {
+    return requestQueue;
+  }
+
+  public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+  {
+    this.requestQueue = requestQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getResponseQueue()
+  {
+    return responseQueue;
+  }
+
+  public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+  {
+    this.responseQueue = responseQueue;
+  }
+
+  public Map<PythonInterpreterConfig, Object> getInitConfigs()
+  {
+    return initConfigs;
+  }
+
+  public void setInitConfigs(Map<PythonInterpreterConfig, Object> initConfigs)
+  {
+    this.initConfigs = initConfigs;
+  }
+
+  public boolean isBusy()
+  {
+    boolean busyState = busyFlag;
+    if (!requestQueue.isEmpty()) { // This is required because interpreter thread goes to a 1 ms sleep to allow other
+      //  threads work when checking the queue for request availability. Hence busy state flag need not necessarily
+      // be updated in this sleep window even though if there is a pending request
+      busyState = true;
+    }
+    return busyState;
+  }
+
+  public void setBusy(boolean busy)
+  {
+    busyFlag = busy;
+  }
+
+  public SpinPolicy getSpinPolicy()
+  {
+    return spinPolicy;
+  }
+
+  public void setSpinPolicy(SpinPolicy spinPolicy)
+  {
+    this.spinPolicy = spinPolicy;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
new file mode 100644
index 0000000..d969b19
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+/***
+ * Wraps around the interpreter thread so that time bound SLAs can be implemented for python based execution
+ * This class primarily implements the time constraints by utilizing the {@link InterpreterThread} class and using
+ *  a Disruptor blocking queue for high throughput. Utilizes an executor service to implement the timing SLAs.
+ */
+public class InterpreterWrapper
+{
+  private static final Logger LOG = LoggerFactory.getLogger(InterpreterWrapper.class);
+
+  /* Reference to the interpreter thread which executes requests in memory  */
+  private transient InterpreterThread interpreterThread;
+
+  /* Spin policy to use  for the disruptor implementation */
+  private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+  private int bufferCapacity = 16; // Represents the number of workers and response queue sizes
+
+  private String interpreterId;
+
+  /* Represents the actual thread instance running under the Executor service  */
+  private transient Future<?> handleToJepRunner;
+
+  private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+  private transient BlockingQueue<PythonRequestResponse> requestQueue;
+  private transient BlockingQueue<PythonRequestResponse> responseQueue;
+
+  /* Represents the queue into which all of the stragglers will be pushed into by the interpreter thread */
+  private transient volatile BlockingQueue<PythonRequestResponse> delayedResponsesQueue;
+
+  /***
+   * Constructs the interpreter wrapper instance.
+   * @param interpreterId A string that can be used to represent the interpreter id that is passed onto the actual
+   *                      thread that is executing the commands
+   *
+   * @param delayedResponsesQueueRef The queue into which all of the straggler responses will end in
+   */
+  public InterpreterWrapper(String interpreterId,BlockingQueue<PythonRequestResponse> delayedResponsesQueueRef,
+      SpinPolicy spinPolicyForWaitingInRequestQueue)
+  {
+    delayedResponsesQueue = delayedResponsesQueueRef;
+    this.interpreterId = interpreterId;
+    this.cpuSpinPolicyForWaitingInBuffer = spinPolicyForWaitingInRequestQueue;
+    requestQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+    responseQueue =  new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+    interpreterThread = new InterpreterThread(requestQueue,responseQueue,interpreterId);
+  }
+
+  /**
+   * Invokes the interpreter thread pre initialization logic
+   * @param preInitConfigs A set of key value pairs that are used to initialize the actual interpreter. See constants
+   *                       defined in {@link InterpreterThread} for a list of keys available
+   * @throws ApexPythonInterpreterException if the pre-initialization logic could not be executed for whatever reasons
+   */
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs) throws ApexPythonInterpreterException
+  {
+    interpreterThread.preInitInterpreter(preInitConfigs);
+  }
+
+  /***
+   * Starts the actual interpreter thread to which this class is wrapping around by using an executor service
+   * @throws ApexPythonInterpreterException
+   */
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    handleToJepRunner = executorService.submit(interpreterThread);
+  }
+
+  /***
+   * Builds a response object for the incoming request object.
+   * @param req Represents the incoming request for which response needs to be generated for.
+   * @param windowId The Operator window ID ( used to choose the interpreter thread while choosing to execute the
+   *                  logic from a pool of worker threads )
+   * @param requestId The request ID perhaps coming from the base python operator. Only used to optimize the right
+   *                  interpreter lookup from a pool of worker interpreter threads.
+   * @param <T> The template of the return type
+   * @return An object of type {@link PythonRequestResponse} is returned which encompasses both request and response.
+   */
+  private <T> PythonRequestResponse<T> buildRequestRespObject(PythonInterpreterRequest<T> req,
+      long windowId,long requestId)
+  {
+    PythonRequestResponse<T> requestResponse = new PythonRequestResponse();
+    requestResponse.setPythonInterpreterRequest(req);
+    PythonInterpreterResponse<T> response = new PythonInterpreterResponse<>(req.getExpectedReturnType());
+    requestResponse.setPythonInterpreterResponse(response);
+    requestResponse.setRequestStartTime(System.currentTimeMillis());
+    requestResponse.setRequestId(requestId);
+    requestResponse.setWindowId(windowId);
+    return requestResponse;
+  }
+
+  /***
+   * Handles the common logic that is common across all methods of invocation of the in-memory interpreter. Some common
+   *  logic includes draining any stragglers, matching the request to the any of the responses that arrive in the
+   *   response queue possibly due to previous requests
+   * @param requestResponse The wrapper object into which
+   * @param req The request that contains the timeout SLAs
+   * @param <T> Java templating signature
+   * @return A response to the original incoming request, null if the response did not arrive within given SLA.
+   * @throws InterruptedException if interrupted while waiting for the response queue.
+   */
+  public <T> PythonRequestResponse<T> processRequest(PythonRequestResponse requestResponse,
+      PythonInterpreterRequest<T> req) throws InterruptedException
+  {
+    List<PythonRequestResponse> drainedResults = new ArrayList<>();
+    PythonRequestResponse currentRequestWithResponse = null;
+    boolean isCurrentRequestProcessed = false;
+    long timeOutInNanos = TimeUnit.NANOSECONDS.convert(req.getTimeout(),req.getTimeUnit());
+    // drain any previous responses that were returned while the Apex operator is processing
+    responseQueue.drainTo(drainedResults);
+    LOG.debug("Draining previous request responses if any " + drainedResults.size());
+    for (PythonRequestResponse oldRequestResponse : drainedResults) {
+      delayedResponsesQueue.put(oldRequestResponse);
+    }
+    // We first set a timer to see how long it actually it took for the response to arrive.
+    // It is possible that a response arrived due to a previous request and hence this need for the timer
+    // which tracks the time for the current request.
+    long currentStart = System.nanoTime();
+    long timeLeftToCompleteProcessing = timeOutInNanos;
+    while ( (!isCurrentRequestProcessed) && ( timeLeftToCompleteProcessing > 0 )) {
+      LOG.debug("Submitting the interpreter Request with time out in nanos as " + timeOutInNanos);
+      requestQueue.put(requestResponse);
+      // ensures we are blocked till the time limit
+      currentRequestWithResponse = responseQueue.poll(timeOutInNanos, TimeUnit.NANOSECONDS);
+      timeLeftToCompleteProcessing = timeLeftToCompleteProcessing - ( System.nanoTime() - currentStart );
+      currentStart = System.nanoTime();
+      if (currentRequestWithResponse != null) {
+        if ( (requestResponse.getRequestId() == currentRequestWithResponse.getRequestId()) &&
+            (requestResponse.getWindowId() == currentRequestWithResponse.getWindowId()) ) {
+          isCurrentRequestProcessed = true;
+          break;
+        } else {
+          delayedResponsesQueue.put(currentRequestWithResponse);
+        }
+      } else {
+        LOG.debug(" Processing of request could not be completed on time");
+      }
+    }
+    if (isCurrentRequestProcessed) {
+      LOG.debug("Response could be processed within time SLA");
+      return currentRequestWithResponse;
+    } else {
+      LOG.debug("Response could not be processed within time SLA");
+      return null;
+    }
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#runCommands(List)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public PythonRequestResponse<Void> runCommands(long windowId, long requestId,
+      PythonInterpreterRequest<Void> request) throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.GENERIC_COMMANDS);
+    PythonRequestResponse requestResponse = buildRequestRespObject(request,windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#executeMethodCall(String, List, Class)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public <T> PythonRequestResponse<T> executeMethodCall(long windowId, long requestId,
+      PythonInterpreterRequest<T> request)  throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    PythonRequestResponse requestResponse = buildRequestRespObject(request, windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#executeScript(String)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public PythonRequestResponse<Void> executeScript(long windowId,long requestId,PythonInterpreterRequest<Void> request)
+      throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.SCRIPT_COMMAND);
+    PythonRequestResponse<Void> requestResponse = buildRequestRespObject(request, windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#eval(String, String, Map, boolean, Class)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public <T> PythonRequestResponse<T> eval(long windowId, long requestId,PythonInterpreterRequest<T> request)
+      throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.EVAL_COMMAND);
+    PythonRequestResponse<T> requestResponse = buildRequestRespObject(request,windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Stops the interpreter
+   * @throws ApexPythonInterpreterException if error while stopping the interpreter
+   */
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    interpreterThread.setStopped(true);
+    handleToJepRunner.cancel(false);
+    executorService.shutdown();
+  }
+
+  public InterpreterThread getInterpreterThread()
+  {
+    return interpreterThread;
+  }
+
+  public void setInterpreterThread(InterpreterThread interpreterThread)
+  {
+    this.interpreterThread = interpreterThread;
+  }
+
+  public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+  {
+    return cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+  {
+    this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public int getBufferCapacity()
+  {
+    return bufferCapacity;
+  }
+
+  public void setBufferCapacity(int bufferCapacity)
+  {
+    this.bufferCapacity = bufferCapacity;
+  }
+
+  public String getInterpreterId()
+  {
+    return interpreterId;
+  }
+
+  public void setInterpreterId(String interpreterId)
+  {
+    this.interpreterId = interpreterId;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getRequestQueue()
+  {
+    return requestQueue;
+  }
+
+  public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+  {
+    this.requestQueue = requestQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getResponseQueue()
+  {
+    return responseQueue;
+  }
+
+  public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+  {
+    this.responseQueue = responseQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getDelayedResponsesQueue()
+  {
+    return delayedResponsesQueue;
+  }
+
+  public void setDelayedResponsesQueue(BlockingQueue<PythonRequestResponse> delayedResponsesQueue)
+  {
+    this.delayedResponsesQueue = delayedResponsesQueue;
+  }
+
+  public boolean isCurrentlyBusy()
+  {
+    return interpreterThread.isBusy();
+  }
+
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
new file mode 100644
index 0000000..b575042
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+import com.google.common.primitives.Ints;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/***
+ * <p>Implements the {@link ApexPythonEngine} interface by using the JEP ( Java Embedded Python ) engine. It is an
+ *  in-memory interpreter and has the following characteristics:
+ *  <ol>
+ *    <li>The python engine allows for 4 major API patterns
+ *    <ul>
+ *      <li>Execute a method call by accepting parameters to pass to the interpreter</li>
+ *      <li>Execute a python script as given in a file path</li>
+ *      <li>Evaluate an expression and allows for passing of variables between the java code and the python
+ *      in-memory interpreter bridge</li>
+ *      <li>A handy method wherein a series of instructions can be passed in one single java call ( executed as a
+ *       sequence of python eval instructions under the hood ) </li>
+ *    </ul>
+ *    <li>Automatic garbage collection of the variables that are passed from java code to the in memory python
+ *     interpreter</li>
+ *    <li>Support for all major python libraries. Tensorflow, Keras, Scikit, XGBoost</li>
+ *    <li>The python engine uses the concept of a worker thread that is responsible for executing any of the 4
+ *     patterns mentioned above. The worker thread is implemented by {@link InterpreterThread}</li>
+ *    <li>The implementation allows for SLA based execution model. i.e. the caller can stipulate that if the call is not
+ *     complete within a time out, the engine code returns back null. See {@link InterpreterWrapper}</li>
+ *    <li>Supports the concept of stragglers i.e. the processing of a request can complete eventually and the
+ *     result available from a queue called as the delayed response queue</li>
+ *    <li>Supports the concept of executing a call on all of the worker threads if required. This is to ensure the
+ *     following use cases:
+ *      <ul>
+ *        <li>Since this is an interpreter, the users can make use of an earlier calls variable definition if
+ *         need be. In such cases, the caller will have the need for a sticky thread i.e. all such calls need to
+ *          end up on the same thread.</li>
+ *        <li>Another reason is to implement the concept of Dynamic partitioning. Since interpreter accumulates
+ *         state due to commands run on it, if a new partition is introduced at runtime, this can failures for all
+ *          subsequent commands as they might depend on variables created in previous windows</li>
+ *      </ul>
+ *    </li>
+ *  </li>
+ *  </ol>
+ * </p>
+ *
+ * <p> Note that the engine implementation can be used independent of an Operator i.e. as a utility stack if need be.
+ *  Some of the API signatures need a window ID and request ID but they do not necessarily mean that the API
+ *   signatures are bound to an operator lifecycle. These parameters are used for efficient thread usage only and
+ *    the API only needs a monotonically increasing number in true sense.
+ * </p>
+ *
+ * <p>
+ *   JEP needs to be installed on all of the YARN nodes prior to the use of the JEP engine until docker support is
+ *    available for Apex. Virtual environments are not supported yet. If multiple versions of python are present
+ *     on the YARN nodes, ensure  the JVM option java.library.path is pointing to the right version of JEP which in
+ *      turn will ensure the right version of python to be used at runtime.
+ * </p>
+ *
+ * <p>
+ *   JEPPythonEngine works on the concept of a worker pool. The engine maintains a configurable number of workers and
+ *    each worker has a dedicated request and response queue. While this class is responsible for choosing the
+ *    right worker from the pool of workers for a given request , the {@link InterpreterWrapper} class is responsible
+ *     for maintaining the time bound SLAs.
+ * </p>
+ *
+ */
+public class JepPythonEngine implements ApexPythonEngine
+{
+  private static final Logger LOG = LoggerFactory.getLogger(JepPythonEngine.class);
+
+  /* Size of the worker pool */
+  private int numWorkerThreads = 3;
+
+  /* A name that can be used while logging messages and also used to set thread names */
+  private String threadGroupName;
+
+  private static final String JEP_LIBRARY_NAME = "jep";
+
+  private transient List<PythonRequestResponse> commandHistory = new ArrayList<>();
+
+  /* Spin policy for the disruptor queue implementation */
+  private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+  // Represents the number of responses that can be held in the queue
+  private int bufferCapacity = 64;
+
+  /* Time used to sleep in the beginning of the interpreter threads run i.e. start  while initializing the interpreter.
+  Note that booting of the memory interpreter can be a really heavy process depending on the libraries that
+   are being loaded and hence this variable */
+  private long sleepTimeAfterInterpreterStart = 3000; // 3 secs
+
+  /**
+   * Represents the queue into which all the stragglers are drained into
+   */
+  private transient BlockingQueue<PythonRequestResponse> delayedResponseQueue =
+      new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+
+  private List<InterpreterWrapper> workers = new ArrayList<>();
+
+  private Map<PythonInterpreterConfig, Object> preInitConfigs;
+
+  private long numStarvedReturns = 0;
+
+  /***
+   * Created the JEP Python engine instance but does not start the interpreters yet
+   * @param threadGroupName A name that represents all the workers in this thread
+   * @param numWorkerThreads Number of workers in the work pool
+   */
+  public JepPythonEngine(String threadGroupName,int numWorkerThreads)
+  {
+    this.numWorkerThreads = numWorkerThreads;
+    this.threadGroupName = threadGroupName;
+  }
+
+  private void initWorkers() throws ApexPythonInterpreterException
+  {
+    LOG.info("Attempting to load the JEP dynamic library");
+    System.loadLibrary(JEP_LIBRARY_NAME);
+    LOG.info("Successfully loaded the JEP dynamic library in memory");
+    SpinPolicy spinPolicyForReqQueue = SpinPolicy.WAITING;
+    if (preInitConfigs.containsKey(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY)) {
+      spinPolicyForReqQueue = (SpinPolicy)preInitConfigs.get(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY);
+    }
+    for ( int i = 0; i < numWorkerThreads; i++) {
+      InterpreterWrapper aWorker = new InterpreterWrapper(threadGroupName + "-" + i,delayedResponseQueue,
+          spinPolicyForReqQueue);
+      aWorker.preInitInterpreter(preInitConfigs);
+      aWorker.startInterpreter();
+      workers.add(aWorker);
+    }
+  }
+
+  /***
+   * Used to select the right worker from the work pool. The goal is to round robin the workers as far as possible.
+   *  Factors like busy workers can mean that the next available worker is chosen
+   * @param requestId Used to round robin the requests. Need not necessarily mean only an operator can use this engine.
+   * @return A worker from the worker pool. Null if all workers are busy.
+   */
+  protected InterpreterWrapper selectWorkerForCurrentCall(long requestId)
+  {
+    int slotToLookFor = Ints.saturatedCast(requestId) % numWorkerThreads;
+    LOG.debug("Slot that is being looked for in the worker pool " + slotToLookFor);
+    boolean isWorkerFound = false;
+    int numWorkersScannedForAvailability = 0;
+    InterpreterWrapper aWorker = null;
+    while ( (!isWorkerFound) && (numWorkersScannedForAvailability < numWorkerThreads)) {
+      aWorker = workers.get(slotToLookFor);
+      numWorkersScannedForAvailability  = numWorkersScannedForAvailability + 1;
+      if (!aWorker.isCurrentlyBusy()) {
+        isWorkerFound = true;
+        LOG.debug("Found worker with index as  " + slotToLookFor);
+        break;
+      } else {
+        LOG.debug("Thread ID is currently busy " + aWorker.getInterpreterId());
+        slotToLookFor = slotToLookFor + 1;
+        if ( slotToLookFor == numWorkerThreads) {
+          slotToLookFor = 0;
+        }
+      }
+    }
+    if (isWorkerFound) {
+      return aWorker;
+    } else {
+      numStarvedReturns += 1;
+      return null;
+    }
+  }
+
+  /***
+   * See {@link ApexPythonEngine#preInitInterpreter(Map)} for more details
+   * @param preInitConfigs The configuration that is going to be used by the interpreter.See constants
+   *                       defined in {@link PythonInterpreterConfig} for a list of keys available
+   * @throws ApexPythonInterpreterException if an issue while executing the pre interpreter logic
+   */
+  @Override
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+    throws ApexPythonInterpreterException
+  {
+    this.preInitConfigs = preInitConfigs;
+  }
+
+  /***
+   * Starts all of the worker threads. Also sleeps for a few moments to ensure "fat" frameworks like Tensorflow can
+   *  be allowed to boot completely.
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    initWorkers();
+    try {
+      if (sleepTimeAfterInterpreterStart > 0) {
+        LOG.debug("Sleeping to let the interpreter boot up in memory");
+        Thread.sleep(sleepTimeAfterInterpreterStart);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /***
+   * Used to execute all of the commands from the command history when an operator is instantiating a new instance of
+   *  the engine. Used by the dynamic partitioner to let a newly provisioned operator to catch up to the state of all of
+   *  the remaining operator instances
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public void postStartInterpreter() throws ApexPythonInterpreterException
+  {
+    for ( InterpreterWrapper wrapper : workers) {
+      for (PythonRequestResponse requestResponse : commandHistory) {
+        PythonInterpreterRequest requestPayload = requestResponse.getPythonInterpreterRequest();
+        try {
+          wrapper.processRequest(requestResponse,requestPayload);
+        } catch (InterruptedException e) {
+          throw new ApexPythonInterpreterException(e);
+        }
+      }
+    }
+  }
+
+  /***
+   * See {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode Whether these commands need to be run on all worker nodes or any of the worker node
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param request Represents the request to be processed.
+   * @return A map containing the command as key and boolean representing success or failure as the value.
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(request);
+    checkNotNull(request.getGenericCommandsRequestPayload(), "Run commands payload not set");
+    checkNotNull(request.getGenericCommandsRequestPayload().getGenericCommands(),
+        "Commands that need to be run not set");
+    Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        LOG.debug("Executing run commands on all of the interpreter worker threads");
+        long  timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(),request.getTimeUnit()) /
+            numWorkerThreads;
+        LOG.debug("Allocating " + timeOutPerWorker + " nanoseconds for each of the worker threads");
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        request.setTimeout(timeOutPerWorker);
+        request.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.runCommands(windowId,requestId,request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          LOG.debug("Executing run commands on a single interpreter worker thread");
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(request.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.runCommands(windowId, requestId, request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  /***
+   *  See {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   *
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param req Represents the request to be processed.
+   * @param <T>
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(req);
+    checkNotNull(req.getMethodCallRequest(), "Method call info not set");
+    checkNotNull(req.getMethodCallRequest().getNameOfMethod(), "Method name not set");
+    Map<String,PythonRequestResponse<T>> returnStatus = new HashMap<>();
+    req.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        req.setTimeout(timeOutPerWorker);
+        req.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.executeMethodCall(windowId,requestId,req);
+          if ( lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(req.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.executeMethodCall(windowId, requestId, req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          } else {
+            throw new ApexPythonInterpreterException("No free interpreter threads available." +
+              " Consider increasing workers and relaunch");
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  /***
+   *   See {@link ApexPythonEngine#executeScript(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode  If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param req Represents the request to be processed.
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<Void> req)
+    throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(req);
+    checkNotNull(req.getScriptExecutionRequestPayload(), "Script execution info not set");
+    checkNotNull(req.getScriptExecutionRequestPayload().getScriptName(), "Script name not set");
+    Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        req.setTimeout(timeOutPerWorker);
+        req.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.executeScript(windowId,requestId,req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(),lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(req.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.executeScript(windowId, requestId, req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  private void checkNotNullConditions(PythonInterpreterRequest request)
+  {
+    checkNotNull(request, "Request object cannnot be null");
+    checkNotNull(request.getTimeout(), "Time out value not set");
+    checkNotNull(request.getTimeUnit(), "Time out unit not set");
+  }
+
+  /***
+   *  See {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param request
+   * @param <T>
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode,long windowId, long requestId,
+      PythonInterpreterRequest<T> request) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(request);
+    checkNotNull(request.getEvalCommandRequestPayload(), "Eval command info not set");
+    checkNotNull(request.getEvalCommandRequestPayload().getEvalCommand(),"Eval command not set");
+    Map<String,PythonRequestResponse<T>> statusOfEval = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(), request.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        request.setTimeout(timeOutPerWorker);
+        request.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.eval(windowId,requestId,request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            statusOfEval.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        commandHistory.add(lastSuccessfullySubmittedRequest);
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(request.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.eval(windowId, requestId, request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            statusOfEval.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return statusOfEval;
+  }
+
+  @Override
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    for ( InterpreterWrapper wrapper : workers) {
+      wrapper.stopInterpreter();
+    }
+  }
+
+  public int getNumWorkerThreads()
+  {
+    return numWorkerThreads;
+  }
+
+  public void setNumWorkerThreads(int numWorkerThreads)
+  {
+    this.numWorkerThreads = numWorkerThreads;
+  }
+
+  public List<InterpreterWrapper> getWorkers()
+  {
+    return workers;
+  }
+
+  public void setWorkers(List<InterpreterWrapper> workers)
+  {
+    this.workers = workers;
+  }
+
+  @Override
+  public List<PythonRequestResponse> getCommandHistory()
+  {
+    return commandHistory;
+  }
+
+  @Override
+  public void setCommandHistory(List<PythonRequestResponse> commandHistory)
+  {
+    this.commandHistory = commandHistory;
+  }
+
+  public long getSleepTimeAfterInterpreterStart()
+  {
+    return sleepTimeAfterInterpreterStart;
+  }
+
+  public void setSleepTimeAfterInterpreterStart(long sleepTimeAfterInterpreterStart)
+  {
+    this.sleepTimeAfterInterpreterStart = sleepTimeAfterInterpreterStart;
+  }
+
+  @Override
+  public BlockingQueue<PythonRequestResponse> getDelayedResponseQueue()
+  {
+    return delayedResponseQueue;
+  }
+
+  @Override
+  public void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue)
+  {
+    this.delayedResponseQueue = delayedResponseQueue;
+  }
+
+  public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+  {
+    return cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+  {
+    this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public int getBufferCapacity()
+  {
+    return bufferCapacity;
+  }
+
+  public void setBufferCapacity(int bufferCapacity)
+  {
+    this.bufferCapacity = bufferCapacity;
+  }
+
+  @Override
+  public long getNumStarvedReturns()
+  {
+    return numStarvedReturns;
+  }
+
+  @Override
+  public void setNumStarvedReturns(long numStarvedReturns)
+  {
+    this.numStarvedReturns = numStarvedReturns;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
new file mode 100644
index 0000000..dd75956
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+public enum SpinPolicy
+{
+  SLEEP,
+  BUSY_SPIN
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
new file mode 100644
index 0000000..9b05d73
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.jep;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
new file mode 100644
index 0000000..ba84249
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
new file mode 100644
index 0000000..6312b29
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.util.KryoCloneUtils;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+
+/***
+ * Abstract partitioner that can be used in partitioning instances of the BasePythonExecution operator. This
+ *  class does not do anything meaningful. See {@link ThreadStarvationBasedPartitioner} for details.
+ */
+public abstract class AbstractPythonExecutionPartitioner implements Partitioner<BasePythonExecutionOperator>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractPythonExecutionPartitioner.class);
+
+  @JsonIgnore
+  protected BasePythonExecutionOperator prototypePythonOperator;
+
+  public AbstractPythonExecutionPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+  {
+    this.prototypePythonOperator = prototypePythonOperator;
+  }
+
+  @Override
+  public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    List<Partition<BasePythonExecutionOperator>> requiredPartitions = buildTargetPartitions(partitions, context);
+    return requiredPartitions;
+  }
+
+  protected abstract List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context);
+
+  @Override
+  public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+  {
+
+  }
+
+  public Partitioner.Partition<BasePythonExecutionOperator> clonePartition()
+  {
+    Partitioner.Partition<BasePythonExecutionOperator> clonedKuduInputOperator =
+        new DefaultPartition<>(KryoCloneUtils.cloneObject(prototypePythonOperator));
+    return clonedKuduInputOperator;
+  }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
new file mode 100644
index 0000000..3acb32c
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+public enum PythonExecutionPartitionerType
+{
+  THREAD_STARVATION_BASED;
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
new file mode 100644
index 0000000..f93ead9
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/***
+ * A partitioner that adds another instance of the python operator when the percent of non-serviced requests within any
+ *  two checkpoint boundaries exceed a certain threshold. The threshold for triggering is configurable and is expressed
+ *   as a percentage.
+ */
+public class ThreadStarvationBasedPartitioner extends AbstractPythonExecutionPartitioner
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadStarvationBasedPartitioner.class);
+
+  private float threadStarvationThresholdRatio;
+
+  public ThreadStarvationBasedPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+  {
+    super(prototypePythonOperator);
+  }
+
+  /***
+   * Calculates the partitions that are required based on the starvations encountered for each checkpoint state. The
+   *  new instance is fed with the command history of the original operator ( if any ) so that the new instance can
+   *   be in the same state of the original operator when it starts processing the new tuples.
+   * @param partitions The current set of partitions
+   * @param context The partitioning context
+   * @return The new set of partitioned instances keeping the old ones in tact and rebuilding only new ones if needed.
+   */
+  @Override
+  protected List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    List<Partition<BasePythonExecutionOperator>> returnList = new ArrayList<>();
+    if (partitions != null) {
+      returnList.addAll(partitions);
+      for (Partition<BasePythonExecutionOperator> aCurrentPartition : partitions) {
+        BasePythonExecutionOperator anOperator = aCurrentPartition.getPartitionedInstance();
+        long starvedCount = anOperator.getNumStarvedReturns();
+        long requestsForCheckpointWindow = anOperator.getNumberOfRequestsProcessedPerCheckpoint();
+        if ( requestsForCheckpointWindow != 0) { // when the operator is starting for the first time
+          float starvationPercent = 100 - ( ((requestsForCheckpointWindow - starvedCount ) /
+              requestsForCheckpointWindow) * 100);
+          if (starvationPercent > anOperator.getStarvationPercentBeforeSpawningNewInstance()) {
+            LOG.info("Creating a new instance of the python operator as starvation % is " + starvationPercent);
+            Partition<BasePythonExecutionOperator> newInstance = clonePartition();
+            List<PythonRequestResponse> commandHistory = new ArrayList<>();
+            commandHistory.addAll(anOperator.getAccumulatedCommandHistory());
+            newInstance.getPartitionedInstance().setAccumulatedCommandHistory(commandHistory);
+            returnList.add(newInstance);
+          }
+        }
+      }
+    }
+    return returnList;
+  }
+
+  public float getThreadStarvationThresholdRatio()
+  {
+    return threadStarvationThresholdRatio;
+  }
+
+  public void setThreadStarvationThresholdRatio(float threadStarvationThresholdRatio)
+  {
+    this.threadStarvationThresholdRatio = threadStarvationThresholdRatio;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
new file mode 100644
index 0000000..94b3e37
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.partitioner;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
new file mode 100644
index 0000000..98aaff5
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class EvalCommandRequestPayload
+{
+  private boolean deleteVariableAfterEvalCall;
+
+  private String variableNameToExtractInEvalCall;
+
+  private String evalCommand;
+
+  private Map<String,Object> paramsForEvalCommand;
+
+  public boolean isDeleteVariableAfterEvalCall()
+  {
+    return deleteVariableAfterEvalCall;
+  }
+
+  public void setDeleteVariableAfterEvalCall(boolean deleteVariableAfterEvalCall)
+  {
+    this.deleteVariableAfterEvalCall = deleteVariableAfterEvalCall;
+  }
+
+  public String getVariableNameToExtractInEvalCall()
+  {
+    return variableNameToExtractInEvalCall;
+  }
+
+  public void setVariableNameToExtractInEvalCall(String variableNameToExtractInEvalCall)
+  {
+    this.variableNameToExtractInEvalCall = variableNameToExtractInEvalCall;
+  }
+
+  public String getEvalCommand()
+  {
+    return evalCommand;
+  }
+
+  public void setEvalCommand(String evalCommand)
+  {
+    this.evalCommand = evalCommand;
+  }
+
+  public Map<String, Object> getParamsForEvalCommand()
+  {
+    return paramsForEvalCommand;
+  }
+
+  public void setParamsForEvalCommand(Map<String, Object> paramsForEvalCommand)
+  {
+    this.paramsForEvalCommand = paramsForEvalCommand;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
new file mode 100644
index 0000000..b037248
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class GenericCommandsRequestPayload
+{
+  List<String> genericCommands;
+
+  public List<String> getGenericCommands()
+  {
+    return genericCommands;
+  }
+
+  public void setGenericCommands(List<String> genericCommands)
+  {
+    this.genericCommands = genericCommands;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
new file mode 100644
index 0000000..368cff9
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class MethodCallRequestPayload
+{
+  private String nameOfMethod;
+
+  private List<Object> args;
+
+  public String getNameOfMethod()
+  {
+    return nameOfMethod;
+  }
+
+  public void setNameOfMethod(String nameOfMethod)
+  {
+    this.nameOfMethod = nameOfMethod;
+  }
+
+  public List<Object> getArgs()
+  {
+    return args;
+  }
+
+  public void setArgs(List<Object> args)
+  {
+    this.args = args;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
new file mode 100644
index 0000000..3721b84
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public enum PythonCommandType
+{
+  GENERIC_COMMANDS,
+  EVAL_COMMAND,
+  SCRIPT_COMMAND,
+  METHOD_INVOCATION_COMMAND
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
new file mode 100644
index 0000000..1b8a46f
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.concurrent.TimeUnit;
+
+public class PythonInterpreterRequest<T>
+{
+  PythonCommandType commandType;
+
+  long timeout;
+
+  TimeUnit timeUnit;
+
+  MethodCallRequestPayload methodCallRequest;
+
+  GenericCommandsRequestPayload genericCommandsRequestPayload;
+
+  EvalCommandRequestPayload evalCommandRequestPayload;
+
+  ScriptExecutionRequestPayload scriptExecutionRequestPayload;
+
+  Class<T> expectedReturnType;
+
+  // This constructor is not to be used by the user code and only exists for Kryo serialization
+  public PythonInterpreterRequest()
+  {
+  }
+
+  public PythonInterpreterRequest(Class<T> expectedReturnType)
+  {
+    this.expectedReturnType = expectedReturnType;
+  }
+
+  public PythonCommandType getCommandType()
+  {
+    return commandType;
+  }
+
+  public void setCommandType(PythonCommandType commandType)
+  {
+    this.commandType = commandType;
+  }
+
+  public MethodCallRequestPayload getMethodCallRequest()
+  {
+    return methodCallRequest;
+  }
+
+  public void setMethodCallRequest(MethodCallRequestPayload methodCallRequest)
+  {
+    this.methodCallRequest = methodCallRequest;
+  }
+
+  public GenericCommandsRequestPayload getGenericCommandsRequestPayload()
+  {
+    return genericCommandsRequestPayload;
+  }
+
+  public void setGenericCommandsRequestPayload(GenericCommandsRequestPayload genericCommandsRequestPayload)
+  {
+    this.genericCommandsRequestPayload = genericCommandsRequestPayload;
+  }
+
+  public EvalCommandRequestPayload getEvalCommandRequestPayload()
+  {
+    return evalCommandRequestPayload;
+  }
+
+  public void setEvalCommandRequestPayload(EvalCommandRequestPayload evalCommandRequestPayload)
+  {
+    this.evalCommandRequestPayload = evalCommandRequestPayload;
+  }
+
+  public ScriptExecutionRequestPayload getScriptExecutionRequestPayload()
+  {
+    return scriptExecutionRequestPayload;
+  }
+
+  public void setScriptExecutionRequestPayload(ScriptExecutionRequestPayload scriptExecutionRequestPayload)
+  {
+    this.scriptExecutionRequestPayload = scriptExecutionRequestPayload;
+  }
+
+  public Class<T> getExpectedReturnType()
+  {
+    return expectedReturnType;
+  }
+
+  public void setExpectedReturnType(Class<T> expectedReturnType)
+  {
+    this.expectedReturnType = expectedReturnType;
+  }
+
+  public long getTimeout()
+  {
+    return timeout;
+  }
+
+  public void setTimeout(long timeout)
+  {
+    this.timeout = timeout;
+  }
+
+  public TimeUnit getTimeUnit()
+  {
+    return timeUnit;
+  }
+
+  public void setTimeUnit(TimeUnit timeUnit)
+  {
+    this.timeUnit = timeUnit;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
new file mode 100644
index 0000000..64dc1a5
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class PythonInterpreterResponse<T>
+{
+
+  Class<T> responseTypeClass;
+
+  Map<String,Boolean> commandStatus;
+
+  // To be used only by the Kryo serializer framework
+  public PythonInterpreterResponse()
+  {
+  }
+
+  public PythonInterpreterResponse(Class<T> responseTypeClassHandle)
+  {
+    responseTypeClass = responseTypeClassHandle;
+  }
+
+  T response;
+
+  public T getResponse()
+  {
+    return response;
+  }
+
+  public void setResponse(T response)
+  {
+    this.response = response;
+  }
+
+  public Map<String, Boolean> getCommandStatus()
+  {
+    return commandStatus;
+  }
+
+  public void setCommandStatus(Map<String, Boolean> commandStatus)
+  {
+    this.commandStatus = commandStatus;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
new file mode 100644
index 0000000..9e3958c
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+
+public class PythonRequestResponse<T>
+{
+  PythonInterpreterRequest pythonInterpreterRequest;
+
+  PythonInterpreterResponse pythonInterpreterResponse;
+
+  long requestId;
+
+  long windowId;
+
+  long requestStartTime;
+
+  long requestCompletionTime;
+
+  public PythonInterpreterRequest getPythonInterpreterRequest()
+  {
+    return pythonInterpreterRequest;
+  }
+
+  public void setPythonInterpreterRequest(PythonInterpreterRequest pythonInterpreterRequest)
+  {
+    this.pythonInterpreterRequest = pythonInterpreterRequest;
+  }
+
+  public PythonInterpreterResponse getPythonInterpreterResponse()
+  {
+    return pythonInterpreterResponse;
+  }
+
+  public void setPythonInterpreterResponse(PythonInterpreterResponse pythonInterpreterResponse)
+  {
+    this.pythonInterpreterResponse = pythonInterpreterResponse;
+  }
+
+  public long getRequestId()
+  {
+    return requestId;
+  }
+
+  public void setRequestId(long requestId)
+  {
+    this.requestId = requestId;
+  }
+
+  public long getWindowId()
+  {
+    return windowId;
+  }
+
+  public void setWindowId(long windowId)
+  {
+    this.windowId = windowId;
+  }
+
+  public long getRequestStartTime()
+  {
+    return requestStartTime;
+  }
+
+  public void setRequestStartTime(long requestStartTime)
+  {
+    this.requestStartTime = requestStartTime;
+  }
+
+  public long getRequestCompletionTime()
+  {
+    return requestCompletionTime;
+  }
+
+  public void setRequestCompletionTime(long requestCompletionTime)
+  {
+    this.requestCompletionTime = requestCompletionTime;
+  }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
new file mode 100644
index 0000000..403c73d
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public class ScriptExecutionRequestPayload
+{
+  String scriptName;
+
+  public String getScriptName()
+  {
+    return scriptName;
+  }
+
+  public void setScriptName(String scriptName)
+  {
+    this.scriptName = scriptName;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
new file mode 100644
index 0000000..dce69a6
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
new file mode 100644
index 0000000..839f493
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import jep.NDArray;
+
+/***
+ * A handy Kryo serializer class that can be used to serialize and deserialize a JEP NDArray instance. It is
+ *  recommended that {@link NDimensionalArray} be used in lieu of this class. This is because NDArray is highly specific
+ *   JEP data structure and will not give flexibility if the python engines are changed in the future.
+ */
+public class NDArrayKryoSerializer extends Serializer<NDArray>
+{
+  private static final short TRUE_AS_SHORTINT = 1;
+
+  private static final short FALSE_AS_SHORTINT = 0;
+
+  @Override
+  public void setGenerics(Kryo kryo, Class[] generics)
+  {
+    super.setGenerics(kryo, generics);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, NDArray ndArray)
+  {
+
+    Object dataVal = ndArray.getData();
+    if (dataVal == null) {
+      return;
+    }
+    // NDArray throws an exception in constructor if not an array. So below value will never be null
+    Class classNameForArrayType = dataVal.getClass().getComponentType(); // null if it is not an array
+    int[] dimensions = ndArray.getDimensions();
+    boolean signedFlag = ndArray.isUnsigned();
+    int arraySizeInSingleDimension = 1;
+    for (int aDimensionSize : dimensions) {
+      arraySizeInSingleDimension = arraySizeInSingleDimension * aDimensionSize;
+    }
+    // write the single dimension length
+    output.writeInt(arraySizeInSingleDimension);
+    // write the dimension of the dimensions int array
+    output.writeInt(dimensions.length);
+    // next we write the dimensions array itself
+    output.writeInts(dimensions);
+
+    // next write the unsigned flag
+    output.writeBoolean(signedFlag);
+
+    // write the data type of the N-dimensional Array
+    if (classNameForArrayType != null) {
+      output.writeString(classNameForArrayType.getCanonicalName());
+    } else {
+      output.writeString(null);
+    }
+
+    // write the array contents
+    if (dataVal != null) {
+      switch (classNameForArrayType.getCanonicalName()) {
+        case "float":
+          output.writeFloats((float[])dataVal);
+          break;
+        case "int":
+          output.writeInts((int[])dataVal);
+          break;
+        case "double":
+          output.writeDoubles((double[])dataVal);
+          break;
+        case "long":
+          output.writeLongs((long[])dataVal);
+          break;
+        case "short":
+          output.writeShorts((short[])dataVal);
+          break;
+        case "byte":
+          output.writeBytes((byte[])dataVal);
+          break;
+        case "boolean":
+          boolean[] originalBoolArray = (boolean[])dataVal;
+          short[] convertedBoolArray = new short[originalBoolArray.length];
+          for (int i = 0; i < originalBoolArray.length; i++) {
+            if (originalBoolArray[i]) {
+              convertedBoolArray[i] = TRUE_AS_SHORTINT;
+            } else {
+              convertedBoolArray[i] = FALSE_AS_SHORTINT;
+            }
+          }
+          output.writeShorts(convertedBoolArray);
+          break;
+        default:
+          throw new RuntimeException("Unsupported NDArray type serialization object");
+      }
+    }
+  }
+
+  @Override
+  public NDArray read(Kryo kryo, Input input, Class<NDArray> aClass)
+  {
+    int singleDimensionArrayLength = input.readInt();
+    int lengthOfDimensionsArray = input.readInt();
+    int[] dimensions = input.readInts(lengthOfDimensionsArray);
+    boolean signedFlag = input.readBoolean();
+
+    String dataType = input.readString();
+    if ( dataType == null) {
+      return null;
+    }
+    switch (dataType) {
+      case "float":
+        NDArray<float[]> floatNDArray = new NDArray<>(
+            input.readFloats(singleDimensionArrayLength),signedFlag,dimensions);
+        return floatNDArray;
+      case "int":
+        NDArray<int[]> intNDArray = new NDArray<>(
+            input.readInts(singleDimensionArrayLength),signedFlag,dimensions);
+        return intNDArray;
+      case "double":
+        NDArray<double[]> doubleNDArray = new NDArray<>(
+            input.readDoubles(singleDimensionArrayLength),signedFlag,dimensions);
+        return doubleNDArray;
+      case "long":
+        NDArray<long[]> longNDArray = new NDArray<>(
+            input.readLongs(singleDimensionArrayLength),signedFlag,dimensions);
+        return longNDArray;
+      case "short":
+        NDArray<short[]> shortNDArray = new NDArray<>(
+            input.readShorts(singleDimensionArrayLength),signedFlag,dimensions);
+        return shortNDArray;
+      case "byte":
+        NDArray<byte[]> byteNDArray = new NDArray<>(
+            input.readBytes(singleDimensionArrayLength),signedFlag,dimensions);
+        return byteNDArray;
+      case "boolean":
+        short[] shortsArray = input.readShorts(singleDimensionArrayLength);
+        boolean[] boolsArray = new boolean[shortsArray.length];
+        for (int i = 0; i < shortsArray.length; i++) {
+          if (TRUE_AS_SHORTINT == shortsArray[i]) {
+            boolsArray[i] = true;
+          } else {
+            boolsArray[i] = false;
+          }
+        }
+        NDArray<boolean[]> booleanNDArray = new NDArray<>(boolsArray,signedFlag,dimensions);
+        return booleanNDArray;
+      default:
+        return null;
+    }
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
new file mode 100644
index 0000000..c233855
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import jep.NDArray;
+
+/***
+ * Represents a wrapper around a numpy array. The way to build a numpy array is by first creating a single
+ *  dimensional array and then specifying the dimensions. The dimensions specify how the single dimension will slice
+ *   the single dimensional array.
+ * @param <T> The data type of the single dimensional array. For example for a numpy array of type float, T will be
+ *           of type float[].
+ *           <p>
+ *           Only the following types are supported:
+ *          <ol>
+ *           <li>float[]</li>
+ *           <li>int[]</li>
+ *           <li>double[]</li>
+ *           <li>long[]</li>
+ *           <li>short[]</li>
+ *           <li>byte[]</li>
+ *           <li>boolean[]</li>
+ *          </ol>
+ *           </p>
+ *           <p>No support for complex types. See example application in test modules for code snippets & usage</p>
+ */
+public class NDimensionalArray<T>
+{
+
+  int[] dimensions;
+
+  T data;
+
+  int lengthOfSequentialArray;
+
+  boolean signedFlag;
+
+  public NDimensionalArray()
+  {
+  }
+
+  public NDArray<T> toNDArray()
+  {
+    return new NDArray<T>(data,signedFlag,dimensions);
+  }
+
+  public int[] getDimensions()
+  {
+    return dimensions;
+  }
+
+  public void setDimensions(int[] dimensions)
+  {
+    this.dimensions = dimensions;
+  }
+
+  public int getLengthOfSequentialArray()
+  {
+    return lengthOfSequentialArray;
+  }
+
+  public void setLengthOfSequentialArray(int lengthOfSequentialArray)
+  {
+    this.lengthOfSequentialArray = lengthOfSequentialArray;
+  }
+
+  public boolean isSignedFlag()
+  {
+    return signedFlag;
+  }
+
+  public void setSignedFlag(boolean signedFlag)
+  {
+    this.signedFlag = signedFlag;
+  }
+
+  public T getData()
+  {
+    return data;
+  }
+
+  public void setData(T data)
+  {
+    this.data = data;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
new file mode 100644
index 0000000..c998c20
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+
+/*** A handy utility class that implements boiler plate code while building request objects. Only commonly
+ * used ones are implemented.
+ *
+ */
+public class PythonRequestResponseUtil
+{
+  /***
+   * Builds the request object for run commands API request. See
+   * {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param commands
+   * @param timeOut timeout for the request to complete
+   * @param timeUnit Time units
+   * @return A request object that can be passed to the Python Engine API for run commands
+   */
+  public static PythonInterpreterRequest<Void> buildRequestObjectForRunCommands(List<String> commands, long timeOut,
+      TimeUnit timeUnit)
+  {
+    GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+    genericCommandsRequestPayload.setGenericCommands(commands);
+    PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds the request object for the Eval command request. See
+   * {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param evalCommand The eval expression
+   * @param evalParams Variables that need to be substituted
+   * @param varNameToExtract The name of variable to extract if any after the expression is evaluated. Can be null
+   * @param deleteVarAfterExtract The name of the variable to delete if any. null allowed
+   * @param timeOut Timeout for the API to complete processing
+   * @param timeUnit Units of time for the time out variable
+   * @param clazz The Class that represents the return type
+   * @param <T> Template construct for Java type inference
+   * @return The request object that can be used for the Eval command
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForEvalCommand(
+      String evalCommand, Map<String,Object> evalParams, String varNameToExtract,
+      boolean deleteVarAfterExtract, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+    evalCommandRequestPayload.setEvalCommand(evalCommand);
+    evalCommandRequestPayload.setVariableNameToExtractInEvalCall(varNameToExtract);
+    evalCommandRequestPayload.setParamsForEvalCommand(evalParams);
+    evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVarAfterExtract);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds the request object for the Method call command. See
+   * {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param methodName Name of the method
+   * @param methodParams parames to the method
+   * @param timeOut Time allocated for completing the API
+   * @param timeUnit The units of time
+   * @param clazz The Class that represents the return type
+   * @param <T> Java templating signature
+   * @return The request object that can be used for method calls
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForMethodCallCommand(
+      String methodName, List<Object> methodParams, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    MethodCallRequestPayload methodCallRequestPayload = new MethodCallRequestPayload();
+    methodCallRequestPayload.setNameOfMethod(methodName);
+    methodCallRequestPayload.setArgs(methodParams);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setMethodCallRequest(methodCallRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds a request object that can be used for executing the script call commands.
+   * @param scriptPath Full path to the file name containing the script
+   * @param timeOut The time that can be used to complete the execution of the script
+   * @param timeUnit Unit of time for time out parameter
+   * @param clazz The class that can be used to represent the return type
+   * @param <T> Java template for type inference
+   * @return The Request object that can be used for a script call invocation
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForScriptCallCommand(
+      String scriptPath, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    ScriptExecutionRequestPayload scriptExecutionRequestPayload = new ScriptExecutionRequestPayload();
+    scriptExecutionRequestPayload.setScriptName(scriptPath);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setScriptExecutionRequestPayload(scriptExecutionRequestPayload);
+    return request;
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
new file mode 100644
index 0000000..9a38d81
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.function.FunctionOperator;
+import org.apache.apex.malhar.lib.function.FunctionOperatorUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PythonOperatorExample")
+/**
+ * @since 3.8.0
+ */
+public class PythonExecutorApplication implements StreamingApplication
+{
+  @VisibleForTesting
+  Function.MapFunction<Object, ?> outputFn = FunctionOperatorUtil.CONSOLE_SINK_FN;
+
+  @VisibleForTesting
+  PythonPayloadPOJOGenerator pojoDataGenerator;
+
+  @VisibleForTesting
+  SimplePythonOpOperator simplePythonOpOperator;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    pojoDataGenerator = dag.addOperator("Input", new PythonPayloadPOJOGenerator());
+    simplePythonOpOperator = dag.addOperator("pythonprocessor", new SimplePythonOpOperator());
+    FunctionOperator.MapFunctionOperator<Object, ?> output = dag.addOperator("out",
+        new FunctionOperator.MapFunctionOperator<>(outputFn));
+    dag.addStream("InputToPython", pojoDataGenerator.output, simplePythonOpOperator.input);
+    dag.addStream("PythonToOutput", simplePythonOpOperator.outputPort, output.input);
+
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
new file mode 100644
index 0000000..fe3d69b
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+import org.apache.apex.malhar.python.test.PythonAvailabilityTestRule;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+
+import static org.junit.Assert.assertEquals;
+
+public class PythonExecutorApplicationTest
+{
+  private static final List<Object> results = Collections.synchronizedList(new ArrayList<>());
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(PythonExecutorApplicationTest.class);
+
+  @Rule
+  public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+
+  @SuppressWarnings("serial")
+  private static final Function.MapFunction<Object, Void> outputFn = new Function.MapFunction<Object, Void>()
+  {
+    @Override
+    public Void f(Object input)
+    {
+      results.add(input);
+      return null;
+    }
+  };
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testApplication() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(Launcher.LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
+    PythonExecutorApplication pythonExecutorApplication = new PythonExecutorApplication();
+    pythonExecutorApplication.outputFn =  outputFn;
+    Launcher.AppHandle appHandle = launcher.launchApp(pythonExecutorApplication, conf, launchAttributes);
+    int sleepTimeCounterForLoopExit = 0;
+    int sleepTimePerIteration = 1000;
+    // wait until expected result count or timeout
+    while (results.size() < pythonExecutorApplication.pojoDataGenerator.getMaxTuples()) {
+      sleepTimeCounterForLoopExit += sleepTimePerIteration;
+      if (sleepTimeCounterForLoopExit > 30000) {
+        break;
+      }
+      LOG.info("Test sleeping until the application time out is reached");
+      Thread.sleep(sleepTimePerIteration);
+    }
+    appHandle.shutdown(Launcher.ShutdownMode.KILL);
+    assertEquals(pythonExecutorApplication.pojoDataGenerator.getMaxTuples(), results.size());
+  }
+
+}
+
+
+
+
+
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
new file mode 100644
index 0000000..6996d94
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits a simple float and int for python operator to consume
+ *
+ * @since 3.8.0
+ */
+public class PythonPayloadPOJOGenerator implements InputOperator
+{
+  private long tuplesCounter = 0;
+  private long currentWindowTuplesCounter = 0;
+
+  // Limit number of emitted tuples per window
+  @Min(1)
+  private long maxTuplesPerWindow = 150;
+
+  @Min(1)
+  private long maxTuples = 300;
+
+  private final Random random = new Random();
+
+  private static final int MAX_RANDOM_INT = 100;
+
+  public static final int DIMENSION_SIZE = 2;
+
+  public static int[] intDimensionSums = new int[ DIMENSION_SIZE * DIMENSION_SIZE ];
+
+  public static float[] floatDimensionSums = new float[DIMENSION_SIZE * DIMENSION_SIZE];
+
+  public final transient DefaultOutputPort<PythonProcessingPojo> output = new DefaultOutputPort<>();
+
+  public PythonPayloadPOJOGenerator()
+  {
+    for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+      intDimensionSums[i] = 0;
+      floatDimensionSums[i] = 0;
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowTuplesCounter = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    while ( ( currentWindowTuplesCounter < maxTuplesPerWindow) && (tuplesCounter < maxTuples) ) {
+      PythonProcessingPojo pythonProcessingPojo = new PythonProcessingPojo();
+      pythonProcessingPojo.setX(random.nextInt(MAX_RANDOM_INT));
+      pythonProcessingPojo.setY(random.nextFloat());
+
+      float[] f = new float[( DIMENSION_SIZE * DIMENSION_SIZE)];
+      for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+        f[i] = random.nextFloat();
+        floatDimensionSums[ i % DIMENSION_SIZE ] = floatDimensionSums[ i % DIMENSION_SIZE ] + f[i];
+      }
+      NDimensionalArray<float[]> nDimensionalFloatArray = new NDimensionalArray<>();
+      nDimensionalFloatArray.setData(f);
+      nDimensionalFloatArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+      nDimensionalFloatArray.setLengthOfSequentialArray(floatDimensionSums.length);
+      nDimensionalFloatArray.setSignedFlag(false);
+      pythonProcessingPojo.setNumpyFloatArray(nDimensionalFloatArray);
+
+      int[] ints = new int[( DIMENSION_SIZE * DIMENSION_SIZE)];
+      for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+        ints[i] = random.nextInt(MAX_RANDOM_INT);
+        intDimensionSums[ i % DIMENSION_SIZE ] = intDimensionSums [ i % DIMENSION_SIZE ] + ints[i];
+      }
+      NDimensionalArray<int[]> nDimensionalIntArray = new NDimensionalArray<>();
+      nDimensionalIntArray.setData(ints);
+      nDimensionalIntArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+      nDimensionalIntArray.setLengthOfSequentialArray(ints.length);
+      nDimensionalIntArray.setSignedFlag(false);
+      pythonProcessingPojo.setNumpyIntArray(nDimensionalIntArray);
+      output.emit(pythonProcessingPojo);
+      currentWindowTuplesCounter += 1;
+      tuplesCounter += 1;
+    }
+  }
+
+  public long getMaxTuples()
+  {
+    return maxTuples;
+  }
+
+  public void setMaxTuples(long maxTuples)
+  {
+    this.maxTuples = maxTuples;
+  }
+
+  public long getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public static int[] getIntDimensionSums()
+  {
+    return intDimensionSums;
+  }
+
+  public static void setIntDimensionSums(int[] intDimensionSums)
+  {
+    PythonPayloadPOJOGenerator.intDimensionSums = intDimensionSums;
+  }
+
+  public static float[] getFloatDimensionSums()
+  {
+    return floatDimensionSums;
+  }
+
+  public static void setFloatDimensionSums(float[] floatDimensionSums)
+  {
+    PythonPayloadPOJOGenerator.floatDimensionSums = floatDimensionSums;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
new file mode 100644
index 0000000..33c3452
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+public class PythonProcessingPojo
+{
+
+  private float y;
+
+  private int x;
+
+
+  private NDimensionalArray<float[]> numpyFloatArray;
+
+  private NDimensionalArray<int[]> numpyIntArray;
+
+
+  public float getY()
+  {
+    return y;
+  }
+
+  public void setY(float y)
+  {
+    this.y = y;
+  }
+
+  public int getX()
+  {
+    return x;
+  }
+
+  public void setX(int x)
+  {
+    this.x = x;
+  }
+
+  public NDimensionalArray<float[]> getNumpyFloatArray()
+  {
+    return numpyFloatArray;
+  }
+
+  public void setNumpyFloatArray(NDimensionalArray<float[]> numpyFloatArray)
+  {
+    this.numpyFloatArray = numpyFloatArray;
+  }
+
+  public NDimensionalArray<int[]> getNumpyIntArray()
+  {
+    return numpyIntArray;
+  }
+
+  public void setNumpyIntArray(NDimensionalArray<int[]> numpyIntArray)
+  {
+    this.numpyIntArray = numpyIntArray;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
new file mode 100644
index 0000000..69ece35
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+import org.apache.apex.malhar.python.base.util.PythonRequestResponseUtil;
+
+public class SimplePythonOpOperator extends BasePythonExecutionOperator<PythonProcessingPojo>
+{
+  private Map<String,PythonRequestResponse<NDimensionalArray>> lastKnownResponse;
+
+  @Override
+  public Map<PythonInterpreterConfig, Object> getPreInitConfigurations()
+  {
+    Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+    Set<String> sharedLibsList = new HashSet<>();
+    sharedLibsList.add("numpy");
+    preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+    // Next two configs allow for a very low latency mode wherein the cost of CPU is sacrificed for low latencies
+    // Defaults are saner and the following config is overriding
+    preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, "" + SpinPolicy.BUSY_SPIN.name());
+    preInitConfigs.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY,
+        com.conversantmedia.util.concurrent.SpinPolicy.SPINNING);
+    return preInitConfigs;
+  }
+
+  @Override
+  public PythonRequestResponse processPythonCodeForIncomingTuple(PythonProcessingPojo input, ApexPythonEngine pythonEngineRef)
+    throws ApexPythonInterpreterException
+  {
+    Map<String,Object> evalParams = new HashMap<>();
+    evalParams.put("intArrayToAdd",input.getNumpyIntArray());
+    evalParams.put("floatArrayToAdd",input.getNumpyFloatArray());
+    // Not assigning to any var as this results in output printed on the console which can be a validation of redirect
+    String evalCommand = "npval=np.add(intMatrix,intArrayToAdd)";
+    //String evalCommand = "print(type(intArrayToAdd))";
+    PythonInterpreterRequest<NDimensionalArray> request = PythonRequestResponseUtil.buildRequestForEvalCommand(
+        evalCommand,evalParams,"intMatrix",false, 20,
+        TimeUnit.MILLISECONDS, NDimensionalArray.class);
+    lastKnownResponse = pythonEngineRef.eval(
+        WorkerExecutionMode.ANY,currentWindowId, requestCounterForThisWindow,request);
+    for ( String evalCommandSubmitted: lastKnownResponse.keySet()) {
+      return lastKnownResponse.get(evalCommandSubmitted); // we just need one of the N workers response.
+    }
+    return null;
+  }
+
+  @Override
+  public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngineRef) throws ApexPythonInterpreterException
+  {
+    List<String> commandsToRun = new ArrayList<>();
+    commandsToRun.add("import sys");
+    commandsToRun.add("import numpy as np");
+    commandsToRun.add("intMatrix = np.ones((2,2),dtype=int)");
+    commandsToRun.add("floatMatrix = np.ones((2,2),dtype=float)");
+    pythonEngineRef.runCommands(WorkerExecutionMode.BROADCAST,0L,0L,
+        PythonRequestResponseUtil.buildRequestObjectForRunCommands(commandsToRun,1, TimeUnit.SECONDS));
+  }
+
+  public Map<String, PythonRequestResponse<NDimensionalArray>> getLastKnownResponse()
+  {
+    return lastKnownResponse;
+  }
+
+  public void setLastKnownResponse(Map<String, PythonRequestResponse<NDimensionalArray>> lastKnownResponse)
+  {
+    this.lastKnownResponse = lastKnownResponse;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
new file mode 100644
index 0000000..7a1c1c1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.test.BasePythonTest;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+public class BaseJEPTest extends BasePythonTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(BaseJEPTest.class);
+
+  public static boolean JEP_INITIALIZED = false;
+
+  private static  Object lockToInitializeJEP = new Object();
+
+  static InterpreterThread pythonEngineThread;
+
+  static InterpreterWrapper interpreterWrapper;
+
+  static JepPythonEngine jepPythonEngine;
+
+  static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+  static BlockingQueue<PythonRequestResponse> requestQueue =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+  static BlockingQueue<PythonRequestResponse> responseQueue =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8,SpinPolicy.WAITING);
+
+  static BlockingQueue<PythonRequestResponse> delayedResponseQueueForWrapper =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+
+  public static void initJEPThread() throws Exception
+  {
+    if (!JEP_INITIALIZED) {
+      synchronized (lockToInitializeJEP) {
+        if (!JEP_INITIALIZED) {
+          // Interpreter for thread based tests
+          pythonEngineThread = new InterpreterThread(requestQueue,responseQueue,"unittests-1");
+          pythonEngineThread.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+          executorService.submit(pythonEngineThread);
+
+          // interpreter wrapper for wrapper based tests
+          interpreterWrapper = new InterpreterWrapper("unit-test-wrapper",delayedResponseQueueForWrapper,
+              SpinPolicy.SPINNING);
+          interpreterWrapper.startInterpreter();
+
+          // JEP python engine tests
+          jepPythonEngine = new JepPythonEngine("unit-tests-jeppythonengine",5);
+          jepPythonEngine.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+          jepPythonEngine.startInterpreter();
+          JEP_INITIALIZED = true;
+        }
+      }
+    }
+  }
+
+  private void setCommonConstructsForRequestResponseObject(PythonCommandType commandType,
+      PythonInterpreterRequest request, PythonRequestResponse requestResponse )
+    throws ApexPythonInterpreterException
+  {
+    request.setCommandType(commandType);
+    requestResponse.setRequestStartTime(System.currentTimeMillis());
+    requestResponse.setRequestId(1L);
+    requestResponse.setWindowId(1L);
+    switch (commandType) {
+      case EVAL_COMMAND:
+        EvalCommandRequestPayload payload = new EvalCommandRequestPayload();
+        request.setEvalCommandRequestPayload(payload);
+        break;
+      case METHOD_INVOCATION_COMMAND:
+        MethodCallRequestPayload methodCallRequest = new MethodCallRequestPayload();
+        request.setMethodCallRequest(methodCallRequest);
+        break;
+      case SCRIPT_COMMAND:
+        ScriptExecutionRequestPayload scriptPayload = new ScriptExecutionRequestPayload();
+        request.setScriptExecutionRequestPayload(scriptPayload);
+        break;
+      case GENERIC_COMMANDS:
+        GenericCommandsRequestPayload payloadForGenericCommands = new GenericCommandsRequestPayload();
+        request.setGenericCommandsRequestPayload(payloadForGenericCommands);
+        break;
+      default:
+        throw new ApexPythonInterpreterException("Unsupported command type");
+    }
+
+  }
+
+  public PythonRequestResponse<Void> buildRequestResponseObjectForVoidPayload(PythonCommandType commandType)
+    throws Exception
+  {
+    PythonRequestResponse<Void> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+    PythonInterpreterResponse<Void> response = new PythonInterpreterResponse<>(Void.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+  public PythonRequestResponse<Long> buildRequestResponseObjectForLongPayload(
+      PythonCommandType commandType) throws Exception
+  {
+    PythonRequestResponse<Long> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    PythonInterpreterResponse<Long> response = new PythonInterpreterResponse<>(Long.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+
+
+  public PythonRequestResponse<Integer> buildRequestResponseObjectForIntPayload(
+      PythonCommandType commandType) throws Exception
+  {
+    PythonRequestResponse<Integer> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Integer> request = new PythonInterpreterRequest<>(Integer.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    PythonInterpreterResponse<Integer> response = new PythonInterpreterResponse<>(Integer.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+
+  protected PythonRequestResponse<Void> runCommands(List<String> commands) throws Exception
+  {
+    PythonRequestResponse<Void> runCommandsRequest = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.GENERIC_COMMANDS);
+    runCommandsRequest.getPythonInterpreterRequest().getGenericCommandsRequestPayload().setGenericCommands(commands);
+    pythonEngineThread.getRequestQueue().put(runCommandsRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    return pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+  }
+
+
+  protected PythonInterpreterRequest<Long> buildRequestObjectForLongEvalCommand(String command, String returnVar,
+      Map<String,Object> paramsForEval, long timeOut, TimeUnit timeUnit, boolean deleteVariable)
+  {
+    PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+    request.setTimeout(timeOut);
+    request.setTimeUnit(timeUnit);
+    EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+    request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+    evalCommandRequestPayload.setParamsForEvalCommand(paramsForEval);
+    evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVariable);
+    evalCommandRequestPayload.setVariableNameToExtractInEvalCall(returnVar);
+    evalCommandRequestPayload.setEvalCommand(command);
+    request.setExpectedReturnType(Long.class);
+    return request;
+  }
+
+  protected PythonInterpreterRequest<Void> buildRequestObjectForVoidGenericCommand(List<String> commands, long timeOut,
+      TimeUnit timeUnit)
+  {
+    PythonInterpreterRequest<Void> genericCommandRequest = new PythonInterpreterRequest<>(Void.class);
+    genericCommandRequest.setTimeout(timeOut);
+    genericCommandRequest.setTimeUnit(timeUnit);
+    GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+    genericCommandsRequestPayload.setGenericCommands(commands);
+    genericCommandRequest.setExpectedReturnType(Void.class);
+    genericCommandRequest.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+    return genericCommandRequest;
+  }
+
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
new file mode 100644
index 0000000..9e37605
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class InterpreterThreadTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterThreadTest.class);
+
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testRunCommands() throws Exception
+  {
+    long currentTime = System.currentTimeMillis();
+    File tempFile = File.createTempFile("apexpythonunittestruncommands-", ".txt");
+    tempFile.deleteOnExit();
+    String filePath = tempFile.getAbsolutePath();
+    assertEquals(0L,tempFile.length());
+
+    List<String> commands = new ArrayList();
+    commands.add("fileHandle  = open('" + filePath + "', 'w')");
+    commands.add("fileHandle.write('" + currentTime + "')");
+    commands.add("fileHandle.flush()");
+    commands.add("fileHandle.close()");
+    runCommands(commands);
+    assertEquals(("" + currentTime).length(), tempFile.length());
+
+    List<String> errorCommands = new ArrayList();
+    errorCommands.add("1+2");
+    errorCommands.add("3+");
+    PythonRequestResponse<Void> response = runCommands(errorCommands);
+    Map<String,Boolean> responseStatus = response.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(responseStatus.get(errorCommands.get(0)));
+    assertFalse(responseStatus.get(errorCommands.get(1)));
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testMethodCall() throws Exception
+  {
+    String methodName = "jepMultiply";
+    List<String> commands = new ArrayList();
+    commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+        "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+    runCommands(commands);
+
+    List<Object> params = new ArrayList<>();
+    params.add(5L);
+    params.add(25L);
+
+    PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+        PythonCommandType.METHOD_INVOCATION_COMMAND);
+    MethodCallRequestPayload requestPayload = methodCallRequest.getPythonInterpreterRequest().getMethodCallRequest();
+    requestPayload.setNameOfMethod(methodName);
+    requestPayload.setArgs(params);
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Long> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),125L);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(methodName));
+
+    params.remove(1);
+    methodCallRequest = buildRequestResponseObjectForLongPayload(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    requestPayload = methodCallRequest
+        .getPythonInterpreterRequest().getMethodCallRequest();
+    requestPayload.setNameOfMethod(methodName);
+    requestPayload.setArgs(params);
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    methodCallResponse = pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+    commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertFalse(commandStatus.get(methodName));
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testScriptCall() throws Exception
+  {
+    File tempFileForScript = File.createTempFile("apexpythonunittestscript-", ".py");
+    tempFileForScript.deleteOnExit();
+    String filePathForFactorialScript = tempFileForScript.getAbsolutePath();
+    migrateFileFromResourcesFolderToTemp("factorial.py",filePathForFactorialScript);
+    PythonRequestResponse<Void> methodCallRequest = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.SCRIPT_COMMAND);
+    methodCallRequest.getPythonInterpreterRequest().getScriptExecutionRequestPayload().setScriptName(
+        filePathForFactorialScript);
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Void> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(filePathForFactorialScript));
+    try (BufferedReader br = new BufferedReader(new FileReader("target/factorial-result.txt"))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        assertEquals(120,Integer.parseInt(line)); // asset factorial is calculated as written in script in resources
+        break; // There is only one line in the file per the python script
+      }
+    }
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testEvalCall() throws Exception
+  {
+    String expression = new String("x = a + b");
+    Random random = new Random();
+    int a = random.nextInt(100);
+    int b = random.nextInt(100);
+    Map<String,Object> argsForEval = new HashMap<>();
+    argsForEval.put("a",a);
+    argsForEval.put("b",b);
+    PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+        PythonCommandType.EVAL_COMMAND);
+    EvalCommandRequestPayload payload = methodCallRequest.getPythonInterpreterRequest().getEvalCommandRequestPayload();
+    payload.setEvalCommand(expression);
+    payload.setParamsForEvalCommand(argsForEval);
+    payload.setDeleteVariableAfterEvalCall(true);
+    payload.setVariableNameToExtractInEvalCall("x");
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Integer> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(expression));
+    assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),(long)(a + b));
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
new file mode 100644
index 0000000..6c2877d
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class InterpreterWrapperTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterWrapperTest.class);
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testTimeOuts() throws Exception
+  {
+    List<String> sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("import time");
+    sequenceOfCommands.add("time.sleep(1)");
+
+    PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,3,TimeUnit.SECONDS);
+    PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L, requestOne);
+    assertNotNull(resultOne);
+
+    requestOne.setTimeUnit(TimeUnit.MILLISECONDS);
+    requestOne.setTimeout(5);
+    PythonRequestResponse<Void> resultTWo = interpreterWrapper.runCommands(1L,1L,requestOne);
+    assertNull(resultTWo);
+
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testDelayedResponseQueue() throws Exception
+  {
+    List<String> sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("import time");
+    sequenceOfCommands.add("x=4;time.sleep(1)");
+
+    PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+    PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L,requestOne);
+
+    HashMap<String,Object> evalParams  = new HashMap<>();
+    evalParams.put("y", 2);
+
+    PythonInterpreterRequest<Long> requestTwo = buildRequestObjectForLongEvalCommand(
+        "x = x * y;time.sleep(1)","x",evalParams,10,TimeUnit.MILLISECONDS,false);
+    PythonRequestResponse<Long> result = interpreterWrapper.eval(1L,1L,requestTwo);
+
+    Thread.sleep(3000);
+
+    // only the next command will result in the queue getting drained of previous requests hence below
+    sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("x=5");
+
+    PythonInterpreterRequest<Void> requestThree = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+    PythonRequestResponse<Void> resultThree = interpreterWrapper.runCommands(1L,1L,requestThree);
+
+    assertFalse(delayedResponseQueueForWrapper.isEmpty());
+    assertEquals(2, delayedResponseQueueForWrapper.drainTo(new ArrayList<>()));
+
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
new file mode 100644
index 0000000..98a1ee1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JepPythonEngineTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(JepPythonEngineTest.class);
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testSelectWorkerForCurrentCall() throws Exception
+  {
+    Set<String> overloadedWorkers = new HashSet<>();
+    List<String> busyCommands = new ArrayList<>();
+    busyCommands.add("import time");
+    busyCommands.add("time.sleep(5)");
+    PythonInterpreterRequest<Void> busyCommandRequest = buildRequestObjectForVoidGenericCommand(busyCommands,
+        2,TimeUnit.MILLISECONDS);
+
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads() - 1; i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,i,busyCommandRequest);
+      assertTrue(aWrapper.isCurrentlyBusy());
+      overloadedWorkers.add(aWrapper.getInterpreterId());
+    }
+    InterpreterWrapper candidateWrapperForExecution = null;
+    InterpreterWrapper validCandidateWrapperForExecution = null;
+    int counterForNullWorkers = 0;
+    int counterForValidWorkers = 0;
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+      if ( candidateWrapperForExecution == null) {
+        counterForNullWorkers += 1;
+      } else {
+        counterForValidWorkers += 1;
+        validCandidateWrapperForExecution = candidateWrapperForExecution;
+      }
+    }
+    // numWorker threads  because the select worker calls iterates over all workers to
+    // get any of the free workers. We did not give any worker for the 5th worker and hence should pass for all calls
+    assertEquals(jepPythonEngine.getNumWorkerThreads(), counterForValidWorkers);
+    assertEquals( 0, counterForNullWorkers); // None of the attempts should fail to get a worker
+    // Also we can only get that worker which has not been assigned a sleep instruction
+    assertFalse(overloadedWorkers.contains(validCandidateWrapperForExecution.getInterpreterId()));
+    Thread.sleep(5000); // all the python workers must be free after this line
+    // we now test for all workers being busy
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,i,busyCommandRequest);
+    }
+    for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+      assertNull(candidateWrapperForExecution);
+    }
+    Thread.sleep(5000); // ensures other tests in this class can use this engine after this test
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testWorkerExecutionMode() throws Exception
+  {
+    String methodName = "multiply";
+    List<String> commands = new ArrayList();
+    commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+        "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+
+    PythonInterpreterRequest<Void> requestForDef = buildRequestObjectForVoidGenericCommand(commands,1000,
+        TimeUnit.MILLISECONDS);
+
+    for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,1L,requestForDef);
+    }
+
+    HashMap<String,Object> params = new HashMap<>();
+    params.put("y",3);
+    Map<String,PythonRequestResponse<Long>> response = jepPythonEngine.eval(WorkerExecutionMode.BROADCAST,
+        1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+        "x", params,1000,TimeUnit.MILLISECONDS,false));
+    for (String aWorkerId: response.keySet()) {
+      assertEquals(12L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+    assertEquals(jepPythonEngine.getNumWorkerThreads(),response.size()); // ensure all workers responded
+
+    params = new HashMap<>();
+    params.put("y",6);
+    response = jepPythonEngine.eval(WorkerExecutionMode.ANY,
+      1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+      "x", params,1000,TimeUnit.MILLISECONDS,false));
+    for (String aWorkerId: response.keySet()) {
+      assertEquals(24L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+    assertEquals(1,response.size()); // ensure all workers responded
+    Thread.sleep(5000); // ensure subsequent tests are not impacted by busy flags from current test
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testPostStartInterpreterLogic() throws Exception
+  {
+    JepPythonEngine pythonEngineForPostInit = new JepPythonEngine("unit-tests-jeppythonengine-preint",
+        5);
+    List<String> commandsForPreInit = new ArrayList<>();
+    commandsForPreInit.add("x=5");
+    PythonRequestResponse<Void> aHistoryCommand = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.GENERIC_COMMANDS);
+    aHistoryCommand.getPythonInterpreterRequest()
+        .getGenericCommandsRequestPayload().setGenericCommands(commandsForPreInit);
+    aHistoryCommand.getPythonInterpreterRequest().setTimeout(10);
+    aHistoryCommand.getPythonInterpreterRequest().setTimeUnit(TimeUnit.MILLISECONDS);
+    List<PythonRequestResponse> historyOfCommands = new ArrayList<>();
+    historyOfCommands.add(aHistoryCommand);
+    pythonEngineForPostInit.setCommandHistory(historyOfCommands);
+
+    pythonEngineForPostInit.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+    pythonEngineForPostInit.startInterpreter();
+
+    pythonEngineForPostInit.postStartInterpreter();
+
+    HashMap<String,Object> params = new HashMap<>();
+    params.put("y",4);
+    Map<String,PythonRequestResponse<Long>> resultOfExecution = pythonEngineForPostInit.eval(
+        WorkerExecutionMode.BROADCAST, 1L,1L, buildRequestObjectForLongEvalCommand(
+        "x=x+y","x",params,100,TimeUnit.MILLISECONDS,false));
+    assertEquals(pythonEngineForPostInit.getNumWorkerThreads(),resultOfExecution.size());
+    for (String aWorkerId : resultOfExecution.keySet()) {
+      assertEquals(9L, resultOfExecution.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+  }
+}
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
new file mode 100644
index 0000000..0807da1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.io.File;
+
+import org.junit.Rule;
+
+import org.apache.commons.io.FileUtils;
+
+public class BasePythonTest
+{
+  @Rule
+  public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+  protected void migrateFileFromResourcesFolderToTemp(String resourceFileName,String targetFilePath) throws Exception
+  {
+    ClassLoader classLoader = getClass().getClassLoader();
+    File outFile = new File(targetFilePath);
+    FileUtils.copyInputStreamToFile(classLoader.getResourceAsStream(resourceFileName), outFile);
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
new file mode 100644
index 0000000..efa3b7c
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation that helps in selectively triggering certain tests if JEP python is available at the time of
+ * launching of the tests
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface JepPythonTestContext
+{
+
+  boolean jepPythonBasedTest() default false;
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
new file mode 100644
index 0000000..118e2fe
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.BaseJEPTest;
+
+/**
+ * A Junit rule that helps in bypassing tests that cannot be done if the Python installation is not present.
+ * The unit tests will be triggered as soon as the switch -DjepInstalled=true is passed from the command line.
+ */
+public class PythonAvailabilityTestRule implements TestRule
+{
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(PythonAvailabilityTestRule.class);
+
+  public static final String JEP_LIBRARY_INSTALLED_SWITCH = "jepInstallPath";
+
+  @Override
+  public Statement apply(Statement base, Description description)
+  {
+    JepPythonTestContext testContext = description.getAnnotation(JepPythonTestContext.class);
+    String jepInstalledStrVal = System.getProperty(JEP_LIBRARY_INSTALLED_SWITCH);
+    boolean jepInstalled = false;
+    if (jepInstalledStrVal != null) {
+      jepInstalled = true;
+      LOG.debug("Using " + jepInstalledStrVal + " as the library path for python interpreter");
+    }
+    boolean runThisTest = true; // default is to run the test if no annotation is specified i.e. python is not required
+    if ( testContext != null) {
+      if ( (testContext.jepPythonBasedTest()) && (jepInstalled) ) {
+        runThisTest = true;
+      } else {
+        runThisTest = false;
+      }
+    }
+    if (runThisTest) {
+      if ( (jepInstalled) && (!BaseJEPTest.JEP_INITIALIZED)) {
+        try {
+          BaseJEPTest.initJEPThread();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return base;
+    } else {
+      // bypass the test altogether
+      return new Statement()
+      {
+        @Override
+        public void evaluate() throws Throwable
+        {
+          // Return an empty Statement object for those tests
+        }
+      };
+    }
+  }
+
+}
diff --git a/python/src/test/resources/META-INF/properties.xml b/python/src/test/resources/META-INF/properties.xml
new file mode 100644
index 0000000..652f981
--- /dev/null
+++ b/python/src/test/resources/META-INF/properties.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+<configuration>
+  <property>
+    <name>dt.operator.pythonprocessor.attr.JVM_OPTIONS</name>
+    <value> -Djava.library.path=${jepInstallPath}</value>
+  </property>
+</configuration>
diff --git a/python/src/test/resources/factorial.py b/python/src/test/resources/factorial.py
new file mode 100755
index 0000000..ca471b8
--- /dev/null
+++ b/python/src/test/resources/factorial.py
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+
+import sys, getopt, os
+
+os.getcwd()
+outputfile = 'target/factorial-result.txt'
+number = 5
+
+
+def factorial(num):
+    if num <= 0:
+        return 1
+    else:
+        factorial = 1
+        for i in range(1,num + 1):
+            factorial = factorial*i
+        return factorial
+
+def writeFile(factorial):
+    fileHandle  = open(outputfile,'w')
+    fileHandle.write(factorial+'\n')
+    fileHandle.flush()
+    fileHandle.close()
+
+
+if __name__ == "__main__":
+   writeFile(str(factorial(number)))
+
+
diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ecb3668
--- /dev/null
+++ b/python/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.apex.malhar.python=INFO
+log4j.logger.org.apache.apex.malhar.python.base=INFO
+log4j.logger.org.apache.apex.malhar.python.base.jep=INFO

-- 
To stop receiving notification emails like this one, please contact
thw@apache.org.