You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/11 22:40:38 UTC

[GitHub] merlimat closed pull request #1758: Windowing for Pulsar Functions

merlimat closed pull request #1758: Windowing for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/example.yml b/conf/example.yml
index 5c052730de..58f9bf558c 100644
--- a/conf/example.yml
+++ b/conf/example.yml
@@ -21,10 +21,10 @@ tenant: "test"
 namespace: "test-namespace"
 name: "example"
 className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
-inputs: ["persistent://sample/standalone/ns1/test_src"]
+inputs: ["test_src"]
 userConfig:
-  "PublishTopic" : "persistent://sample/standalone/ns1/test_result"
+  "PublishTopic" : "test_result"
 
-output: "persistent://sample/standalone/ns1/test_result"
+output: "test_result"
 autoAck: true
 parallelism: 1
diff --git a/conf/window_example.yml b/conf/window_example.yml
new file mode 100644
index 0000000000..a4470d9333
--- /dev/null
+++ b/conf/window_example.yml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "test"
+namespace: "test-namespace"
+name: "example"
+className: "org.apache.pulsar.functions.api.examples.WindowFunction"
+inputs: ["test_src"]
+userConfig:
+  "PublishTopic" : "test_result"
+
+output: "test_result"
+autoAck: true
+parallelism: 1
+windowConfig:
+  windowLengthCount: 10
+  slidingIntervalCount: 5
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e53550637d..9f2ce10fc3 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,25 +18,19 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
-
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -49,9 +43,6 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -60,28 +51,42 @@
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
-import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.Resources;
+import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
+import org.apache.pulsar.functions.windowing.WindowUtils;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.StringConverter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-import com.google.gson.reflect.TypeToken;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -226,7 +231,16 @@ void processArguments() throws Exception {
         protected Long ram;
         @Parameter(names = "--disk", description = "The disk in bytes that need to be allocated per function instance(applicable only to docker runtime)")
         protected Long disk;
-
+        @Parameter(names = "--windowLengthCount", description = "")
+        protected Integer windowLengthCount;
+        @Parameter(names = "--windowLengthDurationMs", description = "")
+        protected Long windowLengthDurationMs;
+        @Parameter(names = "--slidingIntervalCount", description = "")
+        protected Integer slidingIntervalCount;
+        @Parameter(names = "--slidingIntervalDurationMs", description = "")
+        protected Long slidingIntervalDurationMs;
+        @Parameter(names = "--autoAck", description = "")
+        protected Boolean autoAck;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
 
@@ -289,27 +303,16 @@ void processArguments() throws Exception {
             }
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
-                Map<String, String> userConfigMap = new Gson().fromJson(userConfigString, type);
+                Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
                 functionConfig.setUserConfig(userConfigMap);
             }
-            if (null != jarFile) {
-                doJavaSubmitChecks(functionConfig);
-                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
-                userCodeFile = jarFile;
-            } else if (null != pyFile) {
-                doPythonSubmitChecks(functionConfig);
-                functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
-                userCodeFile = pyFile;
-            } else {
-                throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
-            }
 
             if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
                 throw new RuntimeException("No input topic(s) specified for the function");
             }
 
             // Ensure that topics aren't being used as both input and output
-            verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());;
+            verifyNoTopicClash(functionConfig.getInputs(), functionConfig.getOutput());
 
             if (parallelism == null) {
                 if (functionConfig.getParallelism() == 0) {
@@ -335,56 +338,129 @@ void processArguments() throws Exception {
                 throw new IllegalArgumentException("Effectively-once processing semantics can only be achieved using a Failover subscription type");
             }
 
-            functionConfig.setAutoAck(true);
+            // window configs
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (null != windowLengthCount) {
+                if (windowConfig == null) {
+                    windowConfig = new WindowConfig();
+                }
+                windowConfig.setWindowLengthCount(windowLengthCount);
+            }
+            if (null != windowLengthDurationMs) {
+                if (windowConfig == null) {
+                    windowConfig = new WindowConfig();
+                }
+                windowConfig.setWindowLengthDurationMs(windowLengthDurationMs);
+            }
+            if (null != slidingIntervalCount) {
+                if (windowConfig == null) {
+                    windowConfig = new WindowConfig();
+                }
+                windowConfig.setSlidingIntervalCount(slidingIntervalCount);
+            }
+            if (null != slidingIntervalDurationMs) {
+                if (windowConfig == null) {
+                    windowConfig = new WindowConfig();
+                }
+                windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs);
+            }
+            if (windowConfig != null) {
+                WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
+                // set auto ack to false since windowing framework is responsible
+                // for acking and not the function framework
+                if (autoAck != null && autoAck == true) {
+                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+                }
+                functionConfig.setAutoAck(false);
+            }
+            functionConfig.setWindowConfig(windowConfig);
+
+            if  (null != autoAck) {
+                functionConfig.setAutoAck(autoAck);
+            } else {
+                functionConfig.setAutoAck(true);
+            }
+
             inferMissingArguments(functionConfig);
-        }
 
-        public Class<?>[] getFunctionTypes(File file) {
-            if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
-                        functionConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), Function.class)
-                    && !Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), java.util.function.Function.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
-                        functionConfig.getClassName(), jarFile));
+            if (null != jarFile) {
+                doJavaSubmitChecks(functionConfig);
+                functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+                userCodeFile = jarFile;
+            } else if (null != pyFile) {
+                doPythonSubmitChecks(functionConfig);
+                functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
+                userCodeFile = pyFile;
+            } else {
+                throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
             }
+        }
+
+        private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
+            assertClassExistsInJar(file);
 
             Object userClass = Reflections.createInstance(functionConfig.getClassName(), file);
             Class<?>[] typeArgs;
-            if (userClass instanceof Function) {
-                Function pulsarFunction = (Function) userClass;
-                if (pulsarFunction == null) {
-                    throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s",
-                            functionConfig.getClassName(), jarFile));
-                }
-                typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-            } else {
+            // if window function
+            if (functionConfig.getWindowConfig() != null) {
                 java.util.function.Function function = (java.util.function.Function) userClass;
                 if (function == null) {
                     throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
                             functionConfig.getClassName(), jarFile));
                 }
                 typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+                if (!typeArgs[0].equals(Collection.class)) {
+                    throw new IllegalArgumentException("Window function must take a collection as input");
+                }
+                Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
+                Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
+                Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
+                typeArgs[0] = (Class<?>) actualInputType;
+            } else {
+                if (userClass instanceof Function) {
+                    Function pulsarFunction = (Function) userClass;
+                    if (pulsarFunction == null) {
+                        throw new IllegalArgumentException(String.format("The Pulsar function class %s could not be instantiated from jar %s",
+                                functionConfig.getClassName(), jarFile));
+                    }
+                    typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
+                } else {
+                    java.util.function.Function function = (java.util.function.Function) userClass;
+                    if (function == null) {
+                        throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated from jar %s",
+                                functionConfig.getClassName(), jarFile));
+                    }
+                    typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+                }
             }
+
             return typeArgs;
         }
 
+        private void assertClassExistsInJar(File file) {
+            if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
+                throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
+                        functionConfig.getClassName(), jarFile));
+            } else if (!Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), Function.class)
+                    && !Reflections.classInJarImplementsIface(file, functionConfig.getClassName(), java.util.function.Function.class)) {
+                throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
+                        functionConfig.getClassName(), jarFile));
+            }
+        }
+
         private void doJavaSubmitChecks(FunctionConfig functionConfig) {
             if (isNull(functionConfig.getClassName())) {
                 throw new IllegalArgumentException("You supplied a jar file but no main class");
             }
 
             File file = new File(jarFile);
-
             ClassLoader userJarLoader;
             try {
                 userJarLoader = Reflections.loadJar(file);
             } catch (MalformedURLException e) {
                 throw new RuntimeException("Failed to load user jar " + file, e);
             }
-
-            Class<?>[] typeArgs = getFunctionTypes(file);
-
+            Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
             // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
             // implements SerDe class
             functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
@@ -478,6 +554,10 @@ private void doPythonSubmitChecks(FunctionConfig functionConfig) {
             if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                 throw new RuntimeException("Effectively-once processing guarantees not yet supported in Python");
             }
+
+            if (functionConfig.getWindowConfig() != null) {
+                throw new IllegalArgumentException("There is currently no support windowing in python");
+            }
         }
 
         private void validateTopicName(String topic) {
@@ -562,7 +642,7 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
                 } catch (MalformedURLException e) {
                     throw new RuntimeException("Failed to load user jar " + file, e);
                 }
-                typeArgs = getFunctionTypes(file);
+                typeArgs = getFunctionTypes(file, functionConfig);
             }
 
             FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
@@ -610,22 +690,34 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
             if (functionConfig.getName() != null) {
                 functionDetailsBuilder.setName(functionConfig.getName());
             }
-            if (functionConfig.getClassName() != null) {
-                functionDetailsBuilder.setClassName(functionConfig.getClassName());
-            }
             if (functionConfig.getLogTopic() != null) {
                 functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
             }
             if (functionConfig.getRuntime() != null) {
                 functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime()));
             }
-            if (!functionConfig.getUserConfig().isEmpty()) {
-                functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig());
-            }
             if (functionConfig.getProcessingGuarantees() != null) {
                 functionDetailsBuilder.setProcessingGuarantees(
                         convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
             }
+
+            Map<String, Object> configs = new HashMap<>();
+            configs.putAll(functionConfig.getUserConfig());
+            // windowing related
+            WindowConfig windowConfig = functionConfig.getWindowConfig();
+            if (windowConfig != null) {
+                windowConfig.setActualWindowFunctionClassName(functionConfig.getClassName());
+                configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
+                // set class name to window function executor
+                functionDetailsBuilder.setClassName(WindowFunctionExecutor.class.getName());
+
+            } else {
+                if (functionConfig.getClassName() != null) {
+                    functionDetailsBuilder.setClassName(functionConfig.getClassName());
+                }
+            }
+            functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+
             functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
             functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
             if (functionConfig.getResources() != null) {
@@ -922,9 +1014,8 @@ DownloadFunction getDownloader() {
     }
 
     private static FunctionConfig loadConfig(File file) throws IOException {
-
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return  mapper.readValue(file, FunctionConfig.class);
+        return mapper.readValue(file, FunctionConfig.class);
     }
 
     private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 01d8291e82..ff4afcf482 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -117,14 +117,14 @@
      * Get a map of all user-defined key/value configs for the function
      * @return The full map of user-defined config values
      */
-    Map<String, String> getUserConfigMap();
+    Map<String, Object> getUserConfigMap();
 
     /**
      * Get any user-defined key/value
      * @param key The key
      * @return The Optional value specified by the user for that key.
      */
-    Optional<String> getUserConfigValue(String key);
+    Optional<Object> getUserConfigValue(String key);
 
     /**
      * Get any user-defined key/value or a default value if none is present
@@ -132,7 +132,7 @@
      * @param defaultValue
      * @return Either the user config value associated with a given key or a supplied default value
      */
-    String getUserConfigValueOrDefault(String key, String defaultValue);
+    Object getUserConfigValueOrDefault(String key, Object defaultValue);
 
     /**
      * Record a user defined metric
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 563e8e34f4..d1971d0879 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -93,6 +96,7 @@ public void update(double value) {
     @Getter
     @Setter
     private StateContextImpl stateContext;
+    private Map<String, Object> userConfigs;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        ClassLoader classLoader, Consumer inputConsumer) {
@@ -109,6 +113,8 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
         producerConfiguration.setBatchingEnabled(true);
         producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         producerConfiguration.setMaxPendingMessages(1000000);
+        userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
+                new TypeToken<Map<String, Object>>(){}.getType());
     }
 
     public void setCurrentMessageContext(MessageId messageId, String topicName) {
@@ -181,18 +187,19 @@ public Logger getLogger() {
     }
 
     @Override
-    public Optional<String> getUserConfigValue(String key) {
-        return Optional.ofNullable(config.getFunctionDetails().getUserConfigOrDefault(key, null));
+    public Optional<Object> getUserConfigValue(String key) {
+
+        return Optional.ofNullable(userConfigs.getOrDefault(key, null));
     }
 
     @Override
-    public String getUserConfigValueOrDefault(String key, String defaultValue) {
+    public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
         return getUserConfigValue(key).orElse(defaultValue);
     }
 
     @Override
-    public Map<String, String> getUserConfigMap() {
-        return config.getFunctionDetails().getUserConfigMap();
+    public Map<String, Object> getUserConfigMap() {
+        return userConfigs;
     }
 
     @Override
@@ -220,7 +227,9 @@ public void incrCounter(String key, long amount) {
                 return retval;
             }
         }
-
+        if (StringUtils.isEmpty(serDeClassName)) {
+            serDeClassName = DefaultSerDe.class.getName();
+        }
         if (!publishSerializers.containsKey(serDeClassName)) {
             SerDe serDe;
             if (serDeClassName.equals(DefaultSerDe.class.getName())) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3ef7bacdcd..aec1e8e159 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -23,6 +23,7 @@
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
@@ -51,7 +52,6 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -479,7 +479,8 @@ public void setupInput() throws Exception {
         }
         this.source = (Source) object;
 
-        this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class));
+        this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
+                new TypeToken<Map<String, Object>>(){}.getType()));
     }
 
     public void setupOutput() throws Exception {
@@ -513,6 +514,6 @@ public void setupOutput() throws Exception {
         } else {
             throw new RuntimeException("Sink does not implement correct interface");
         }
-        this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), Map.class));
+        this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()));
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java
new file mode 100644
index 0000000000..728df90371
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/DefaultEvictionContext.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+public class DefaultEvictionContext implements EvictionContext {
+
+    /**
+     * Current timestamp
+     */
+    private final Long referenceTime;
+
+    /**
+     * Current event count in window
+     */
+    private final Long currentCount;
+
+    /**
+     * User set sliding window count
+     */
+    private final Long slidingCount;
+
+    /**
+     * User set sliding window interval
+     */
+    private final Long slidingInterval;
+
+    public DefaultEvictionContext(Long referenceTime) {
+        this(referenceTime, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount) {
+        this(referenceTime, currentCount, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+        this(referenceTime, currentCount, slidingCount, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount, Long
+            slidingInterval) {
+        this.referenceTime = referenceTime;
+        this.currentCount = currentCount;
+        this.slidingCount = slidingCount;
+        this.slidingInterval = slidingInterval;
+    }
+
+    @Override
+    public Long getReferenceTime() {
+        return referenceTime;
+    }
+
+    @Override
+    public Long getCurrentCount() {
+        return currentCount;
+    }
+
+    @Override
+    public Long getSlidingCount() {
+        return slidingCount;
+    }
+
+    @Override
+    public Long getSlidingInterval() {
+        return slidingInterval;
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
new file mode 100644
index 0000000000..19beb5eb65
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * An event is a wrapper object that gets stored in the window.
+ *
+ * @param <T> the type of the object thats wrapped
+ */
+public interface Event<T> {
+    /**
+     * The event timestamp in millis
+     *
+     * @return the event timestamp in milliseconds.
+     */
+    long getTimestamp();
+
+    /**
+     * Returns the wrapped object
+     *
+     * @return the wrapped object.
+     */
+    T get();
+
+    /**
+     * If this is a watermark event or not. Watermark events are used
+     * for tracking time while processing event based ts.
+     *
+     * @return true if this is a watermark event
+     */
+    boolean isWatermark();
+
+
+    /**
+     * Get the message id of this event
+     *
+     * @return byte array of the message id
+     */
+    byte[] getMessageId();
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
new file mode 100644
index 0000000000..2497c8521c
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@ToString
+@EqualsAndHashCode
+public class EventImpl<T> implements Event<T> {
+    private final T event;
+    private final long ts;
+    private final byte[] messageId;
+
+    EventImpl(T event, long ts, byte[] messageId) {
+        this.event = event;
+        this.ts = ts;
+        this.messageId = messageId;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return ts;
+    }
+
+    @Override
+    public T get() {
+        return event;
+    }
+
+    @Override
+    public boolean isWatermark() {
+        return false;
+    }
+
+    @Override
+    public byte[] getMessageId() {
+        return messageId;
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
new file mode 100644
index 0000000000..d5700a8e5a
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * Context information that can be used by the eviction policy
+ */
+public interface EvictionContext {
+    /**
+     * Returns the reference time that the eviction policy could use to
+     * evict the events. In the case of event time processing, this would be
+     * the watermark time.
+     *
+     * @return the reference time in millis
+     */
+    Long getReferenceTime();
+
+    /**
+     * Returns the sliding count for count based windows
+     *
+     * @return the sliding count
+     */
+    Long getSlidingCount();
+
+
+    /**
+     * Returns the sliding interval for time based windows
+     *
+     * @return the sliding interval
+     */
+    Long getSlidingInterval();
+
+    /**
+     * Returns the current count of events in the queue up to the reference time
+     * based on which count based evictions can be performed.
+     *
+     * @return the current count
+     */
+    Long getCurrentCount();
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
new file mode 100644
index 0000000000..2144fc1d94
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * Eviction policy tracks events and decides whether
+ * an event should be evicted from the window or not.
+ *
+ * @param <T> the type of event that is tracked.
+ * @param <S> the type of state that is used
+ */
+public interface EvictionPolicy<T, S> {
+    /**
+     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
+     */
+    enum Action {
+        /**
+         * expire the event and remove it from the queue.
+         */
+        EXPIRE, /**
+         * process the event in the current window of events.
+         */
+        PROCESS, /**
+         * don't include in the current window but keep the event
+         * in the queue for evaluating as a part of future windows.
+         */
+        KEEP, /**
+         * stop processing the queue, there cannot be anymore events
+         * satisfying the eviction policy.
+         */
+        STOP
+    }
+
+    /**
+     * Decides if an event should be expired from the window, processed in the current
+     * window or kept for later processing.
+     *
+     * @param event the input event
+     * @return the {@link EvictionPolicy.Action} to be taken based on the input event
+     */
+    Action evict(Event<T> event);
+
+    /**
+     * Tracks the event to later decide whether
+     * {@link EvictionPolicy#evict(Event)} should evict it or not.
+     *
+     * @param event the input event to be tracked
+     */
+    void track(Event<T> event);
+
+    /**
+     * Sets a context in the eviction policy that can be used while evicting the events.
+     * E.g. For TimeEvictionPolicy, this could be used to set the reference timestamp.
+     *
+     * @param context the eviction context
+     */
+    void setContext(EvictionContext context);
+
+    /**
+     * Returns the current context that is part of this eviction policy.
+     *
+     * @return the eviction context
+     */
+    EvictionContext getContext();
+
+    /**
+     * Resets the eviction policy.
+     */
+    void reset();
+
+    /**
+     * Return runtime state to be checkpointed by the framework for restoring the eviction policy
+     * in case of failures.
+     *
+     * @return the state
+     */
+    S getState();
+
+    /**
+     * Restore the eviction policy from the state that was earlier checkpointed by the framework.
+     *
+     * @param state the state
+     */
+    void restoreState(S state);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TimestampExtractor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TimestampExtractor.java
new file mode 100644
index 0000000000..bdcddf8c89
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TimestampExtractor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented for extracting timestamp from a tuple.
+ */
+public interface TimestampExtractor<I> extends Serializable {
+    /**
+     * Return the tuple timestamp indicating the time when the event happened.
+     *
+     * @param input
+     * @return the timestamp
+     */
+    long extractTimestamp(I input);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerHandler.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerHandler.java
new file mode 100644
index 0000000000..558f5ee57f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * The callback fired by {@link TriggerPolicy} when the trigger
+ * condition is satisfied.
+ */
+public interface TriggerHandler {
+    /**
+     * The code to execute when the {@link TriggerPolicy} condition is satisfied.
+     *
+     * @return true if the window was evaluated with at least one event in the window, false otherwise
+     */
+    boolean onTrigger();
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerPolicy.java
new file mode 100644
index 0000000000..b64dee9f42
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/TriggerPolicy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * Triggers the window calculations based on the policy.
+ *
+ * @param <T> the type of the event that is tracked
+ * @param <S> the type of state that is used
+ */
+public interface TriggerPolicy<T, S> {
+
+    /**
+     * Tracks the event and could use this to invoke the trigger.
+     *
+     * @param event the input event
+     */
+    void track(Event<T> event);
+
+    /**
+     * resets the trigger policy.
+     */
+    void reset();
+
+    /**
+     * Starts the trigger policy. This can be used
+     * during recovery to start the triggers after
+     * recovery is complete.
+     */
+    void start();
+
+    /**
+     * Any clean up could be handled here.
+     */
+    void shutdown();
+
+    /**
+     * Return runtime state to be checkpointed by the framework for restoring the trigger policy
+     * in case of failures.
+     *
+     * @return the state
+     */
+    S getState();
+
+    /**
+     * Restore the trigger policy from the state that was earlier checkpointed by the framework.
+     *
+     * @param state the state
+     */
+    void restoreState(S state);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEvent.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEvent.java
new file mode 100644
index 0000000000..20a976d960
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+/**
+ * Watermark event used for tracking progress of time when
+ * processing event based ts.
+ */
+public class WaterMarkEvent<T> extends EventImpl<T> {
+
+    public WaterMarkEvent(long ts) {
+        super(null, ts, null);
+    }
+
+    @Override
+    public boolean isWatermark() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "WaterMarkEvent{} " + super.toString();
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
new file mode 100644
index 0000000000..046d584f76
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.ThreadContext;
+import org.apache.pulsar.functions.api.Context;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tracks tuples across input topics and periodically emits watermark events.
+ * Watermark event timestamp is the minimum of the latest tuple timestamps
+ * across all the input topics (minus the lag). Once a watermark event is emitted
+ * any tuple coming with an earlier timestamp can be considered as late events.
+ */
+@Slf4j
+public class WaterMarkEventGenerator<T> implements Runnable {
+    private final WindowManager<T> windowManager;
+    private final long eventTsLagMs;
+    private final Set<String> inputTopics;
+    private final Map<String, Long> topicToTs;
+    private final ScheduledExecutorService executorService;
+    private final long intervalMs;
+    private ScheduledFuture<?> executorFuture;
+    private volatile long lastWaterMarkTs;
+    private Context context;
+
+    /**
+     * Creates a new WatermarkEventGenerator.
+     * @param windowManager The window manager this generator will submit watermark events to
+     * @param intervalMs The generator will check if it should generate a watermark event with this intervalMs
+     * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
+     * @param inputTopics The input topics this generator is expected to handle
+     */
+    public WaterMarkEventGenerator(WindowManager<T> windowManager, long intervalMs,
+                                   long eventTsLagMs, Set<String> inputTopics, Context context) {
+        this.windowManager = windowManager;
+        topicToTs = new ConcurrentHashMap<>();
+
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("watermark-event-generator-%d")
+                .setDaemon(true)
+                .build();
+        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+        this.intervalMs = intervalMs;
+        this.eventTsLagMs = eventTsLagMs;
+        this.inputTopics = inputTopics;
+        this.context = context;
+    }
+
+    /**
+     * Tracks the timestamp of the event from a topic, returns
+     * true if the event can be considered for processing or
+     * false if its a late event.
+     */
+    public boolean track(String inputTopic, long ts) {
+        Long currentVal = topicToTs.get(inputTopic);
+        if (currentVal == null || ts > currentVal) {
+            topicToTs.put(inputTopic, ts);
+        }
+        checkFailures();
+        return ts >= lastWaterMarkTs;
+    }
+
+    @Override
+    public void run() {
+        // initialize the thread context
+        ThreadContext.put("function", WindowUtils.getFullyQualifiedName(
+                context.getTenant(), context.getNamespace(), context.getFunctionName()));
+        try {
+            long waterMarkTs = computeWaterMarkTs();
+            if (waterMarkTs > lastWaterMarkTs) {
+                windowManager.add(new WaterMarkEvent<>(waterMarkTs));
+                lastWaterMarkTs = waterMarkTs;
+            }
+        } catch (Throwable th) {
+            log.error("Failed while processing watermark event ", th);
+            throw th;
+        }
+    }
+
+    /**
+     * Computes the min ts across all intput topics.
+     */
+    private long computeWaterMarkTs() {
+        long ts = 0;
+        // only if some data has arrived on each input topic
+        if (topicToTs.size() >= inputTopics.size()) {
+            ts = Long.MAX_VALUE;
+            for (Map.Entry<String, Long> entry : topicToTs.entrySet()) {
+                ts = Math.min(ts, entry.getValue());
+            }
+        }
+        return ts - eventTsLagMs;
+    }
+
+    private void checkFailures() {
+        if (executorFuture != null && executorFuture.isDone()) {
+            try {
+                executorFuture.get();
+            } catch (InterruptedException | ExecutionException ex) {
+                log.error("Got exception ", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+
+    public void start() {
+        this.executorFuture = executorService.scheduleAtFixedRate(this, intervalMs, intervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown() {
+        log.debug("Shutting down WaterMarkEventGenerator");
+        executorService.shutdown();
+
+        try {
+            if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException ie) {
+            executorService.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java
new file mode 100644
index 0000000000..5125c9e3d9
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Window.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import java.util.List;
+
+/**
+ * A view of events in a sliding window.
+ *
+ * @param <T> the type of event that this window contains.
+ */
+public interface Window<T> {
+    /**
+     * Gets the list of events in the window.
+     * <p>
+     * <b>Note: </b> If the number of tuples in windows is huge, invoking {@code get} would
+     * load all the tuples into memory and may throw an OOM exception. Use windowing with persistence
+     * </p>
+     *
+     * @return the list of events in the window.
+     */
+    List<T> get();
+
+    /**
+     * Get the list of newly added events in the window since the last time the window was generated.
+     *
+     * @return the list of newly added events in the window.
+     */
+    List<T> getNew();
+
+    /**
+     * Get the list of events expired from the window since the last time the window was generated.
+     *
+     * @return the list of events expired from the window.
+     */
+    List<T> getExpired();
+
+    /**
+     * If processing based on event time, returns the window end time based on watermark otherwise
+     * returns the window end time based on processing time.
+     *
+     * @return the window end timestamp
+     */
+    Long getEndTimestamp();
+
+    /**
+     * Returns the window start timestamp. Will return null if the window length is not based on
+     * time duration.
+     *
+     * @return the window start timestamp or null if the window length is not time based
+     */
+    Long getStartTimestamp();
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
new file mode 100644
index 0000000000..1bb54cd727
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface WindowContext {
+    /**
+     * The name of the function that we are executing
+     * @return The Function name
+     */
+    String getFunctionName();
+
+    /**
+     * The id of the function that we are executing
+     * @return The function id
+     */
+    String getFunctionId();
+
+    /**
+     * The id of the instance that invokes this function.
+     *
+     * @return the instance id
+     */
+    String getInstanceId();
+
+    /**
+     * The version of the function that we are executing
+     * @return The version id
+     */
+    String getFunctionVersion();
+
+    /**
+     * The memory limit that this function is limited to
+     * @return Memory limit in bytes
+     */
+    long getMemoryLimit();
+
+    /**
+     * The time budget in ms that the function is limited to.
+     * @return Time budget in msecs.
+     */
+    long getTimeBudgetInMs();
+
+    /**
+     * The time in ms remaining for this function execution to complete before it
+     * will be flagged as an error
+     * @return Time remaining in ms.
+     */
+    long getRemainingTimeInMs();
+
+    /**
+     * The logger object that can be used to log in a function
+     * @return the logger object
+     */
+    Logger getLogger();
+
+    /**
+     * Get Any user defined key/value
+     * @param key The key
+     * @return The value specified by the user for that key. null if no such key
+     */
+    String getUserConfigValue(String key);
+
+    /**
+     * Record a user defined metric
+     * @param metricName The name of the metric
+     * @param value The value of the metric
+     */
+    void recordMetric(String metricName, double value);
+
+    /**
+     * Publish an object using serDe for serializing to the topic
+     * @param topicName The name of the topic for publishing
+     * @param object The object that needs to be published
+     * @param serDeClassName The class name of the class that needs to be used to serialize the object before publishing
+     * @return
+     */
+    CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName);
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
new file mode 100644
index 0000000000..c7b3919461
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import org.apache.pulsar.functions.api.Context;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+
+public class WindowContextImpl implements WindowContext {
+
+    private Context context;
+
+    public WindowContextImpl(Context context) {
+        this.context = context;
+    }
+
+    @Override
+    public String getFunctionName() {
+        return this.context.getFunctionName();
+    }
+
+    @Override
+    public String getFunctionId() {
+        return this.context.getFunctionId();
+    }
+
+    @Override
+    public String getInstanceId() {
+        return this.context.getInstanceId();
+    }
+
+    @Override
+    public String getFunctionVersion() {
+        return this.getFunctionVersion();
+    }
+
+    @Override
+    public long getMemoryLimit() {
+        return this.getMemoryLimit();
+    }
+
+    @Override
+    public long getTimeBudgetInMs() {
+        return this.getTimeBudgetInMs();
+    }
+
+    @Override
+    public long getRemainingTimeInMs() {
+        return this.getRemainingTimeInMs();
+    }
+
+    @Override
+    public Logger getLogger() {
+        return this.getLogger();
+    }
+
+    @Override
+    public String getUserConfigValue(String key) {
+        return this.getUserConfigValue(key);
+    }
+
+    @Override
+    public void recordMetric(String metricName, double value) {
+        this.context.recordMetric(metricName, value);
+    }
+
+    @Override
+    public CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName) {
+        return this.context.publish(topicName, object, serDeClassName);
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
new file mode 100644
index 0000000000..a82a195a79
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.WatermarkTimeEvictionPolicy;
+import org.apache.pulsar.functions.windowing.triggers.CountTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class WindowFunctionExecutor<I, O> implements Function<I, O> {
+
+    private boolean initialized;
+    protected WindowConfig windowConfig;
+    private WindowManager<I> windowManager;
+    private TimestampExtractor<I> timestampExtractor;
+    protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator;
+
+    protected static final long DEFAULT_MAX_LAG_MS = 0; // no lag
+    protected static final long DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
+
+    protected java.util.function.Function<Collection<I>, O> windowFunction;
+
+    public void initialize(Context context) {
+        this.windowConfig = this.getWindowConfigs(context);
+        this.windowFunction = intializeUserFunction(this.windowConfig);
+        log.info("Window Config: {}", this.windowConfig);
+        this.windowManager = this.getWindowManager(this.windowConfig, context);
+        this.initialized = true;
+        this.start();
+    }
+
+    private java.util.function.Function<Collection<I>, O> intializeUserFunction(WindowConfig windowConfig) {
+        String actualWindowFunctionClassName = windowConfig.getActualWindowFunctionClassName();
+        ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+        Object userClassObject = Reflections.createInstance(
+                actualWindowFunctionClassName,
+                clsLoader);
+        if (userClassObject instanceof java.util.function.Function) {
+            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(
+                    java.util.function.Function.class, userClassObject.getClass());
+            if (typeArgs[0].equals(Collection.class)) {
+                return (java.util.function.Function) userClassObject;
+            } else {
+                throw new IllegalArgumentException("Window function must take a collection as input");
+            }
+        } else {
+            throw new IllegalArgumentException("Window function does not implement the correct interface");
+        }
+    }
+
+    private WindowConfig getWindowConfigs(Context context) {
+
+        if (!context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).isPresent()) {
+            throw new IllegalArgumentException("Window Configs cannot be found");
+        }
+        WindowConfig windowConfig = new Gson().fromJson(
+                (new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
+                WindowConfig.class);
+
+        WindowUtils.validateAndSetDefaultsWindowConfig(windowConfig);
+        return windowConfig;
+    }
+
+    private WindowManager<I> getWindowManager(WindowConfig windowConfig, Context context) {
+
+        WindowLifecycleListener<Event<I>> lifecycleListener = newWindowLifecycleListener(context);
+        WindowManager<I> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
+
+        if (this.windowConfig.getTimestampExtractorClassName() != null) {
+            this.timestampExtractor = getTimeStampExtractor(windowConfig);
+
+            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, this.windowConfig
+                    .getWatermarkEmitIntervalMs(),
+                    this.windowConfig.getMaxLagMs(), new HashSet<>(context.getInputTopics()), context);
+        } else {
+            if (this.windowConfig.getLateDataTopic() != null) {
+                throw new IllegalArgumentException(
+                        "Late data topic can be defined only when specifying a timestamp extractor class");
+            }
+        }
+
+        EvictionPolicy<I, ?> evictionPolicy = getEvictionPolicy(windowConfig);
+        TriggerPolicy<I, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager,
+                evictionPolicy, context);
+        manager.setEvictionPolicy(evictionPolicy);
+        manager.setTriggerPolicy(triggerPolicy);
+
+        return manager;
+    }
+
+    private TimestampExtractor<I> getTimeStampExtractor(WindowConfig windowConfig) {
+
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(windowConfig.getTimestampExtractorClassName(),
+                    true, Thread.currentThread().getContextClassLoader());
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException(
+                    String.format("Timestamp extractor class %s must be in class path",
+                            windowConfig.getTimestampExtractorClassName()), cnfe);
+        }
+
+        Object result;
+        try {
+            Constructor<?> constructor = theCls.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            result = constructor.newInstance();
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("User class doesn't have such method", e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("User class must have a no-arg constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("User class constructor throws exception", e);
+        }
+        Class<?>[] timestampExtractorTypeArgs = TypeResolver.resolveRawArguments(
+                TimestampExtractor.class, result.getClass());
+        Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, this.getClass());
+        if (!typeArgs[0].equals(timestampExtractorTypeArgs[0])) {
+            throw new RuntimeException(
+                    "Inconsistent types found between function input type and timestamp extractor type: "
+                            + " function type = " + typeArgs[0] + ", timestamp extractor type = "
+                            + timestampExtractorTypeArgs[0]);
+        }
+        return (TimestampExtractor<I>) result;
+    }
+
+    private TriggerPolicy<I, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<I> manager,
+                                                 EvictionPolicy<I, ?> evictionPolicy, Context context) {
+        if (windowConfig.getSlidingIntervalCount() != null) {
+            if (this.isEventTime()) {
+                return new WatermarkCountTriggerPolicy<>(
+                        windowConfig.getSlidingIntervalCount(), manager, evictionPolicy, manager);
+            } else {
+                return new CountTriggerPolicy<>(windowConfig.getSlidingIntervalCount(), manager, evictionPolicy);
+            }
+        } else {
+            if (this.isEventTime()) {
+                return new WatermarkTimeTriggerPolicy<>(windowConfig.getSlidingIntervalDurationMs(), manager,
+                        evictionPolicy, manager);
+            }
+            return new TimeTriggerPolicy<>(windowConfig.getSlidingIntervalDurationMs(), manager,
+                    evictionPolicy, context);
+        }
+    }
+
+    private EvictionPolicy<I, ?> getEvictionPolicy(WindowConfig windowConfig) {
+        if (windowConfig.getWindowLengthCount() != null) {
+            if (this.isEventTime()) {
+                return new WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount());
+            } else {
+                return new CountEvictionPolicy<>(windowConfig.getWindowLengthCount());
+            }
+        } else {
+            if (this.isEventTime()) {
+                return new WatermarkTimeEvictionPolicy<>(
+                        windowConfig.getWindowLengthDurationMs(), windowConfig.getMaxLagMs());
+            } else {
+                return new TimeEvictionPolicy<>(windowConfig.getWindowLengthDurationMs());
+            }
+        }
+    }
+
+    protected WindowLifecycleListener<Event<I>> newWindowLifecycleListener(Context context) {
+        return new WindowLifecycleListener<Event<I>>() {
+            @Override
+            public void onExpiry(List<Event<I>> events) {
+                for (Event<I> event : events) {
+                    context.ack(event.getMessageId());
+                }
+            }
+
+            @Override
+            public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<Event<I>>
+                    expiredTuples, Long referenceTime) {
+                processWindow(
+                        context,
+                        tuples.stream().map(event -> event.get()).collect(Collectors.toList()),
+                        newTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
+                        expiredTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
+                        referenceTime);
+            }
+        };
+    }
+
+    private void processWindow(Context context, List<I> tuples, List<I> newTuples, List<I>
+            expiredTuples, Long referenceTime) {
+
+        O output = null;
+        try {
+            output = this.process(
+                    new WindowImpl<>(tuples, newTuples, expiredTuples, getWindowStartTs(referenceTime), referenceTime),
+                    new WindowContextImpl(context));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (output != null) {
+            context.publish(context.getOutputTopic(), output, context.getOutputSerdeClassName());
+        }
+    }
+
+    private Long getWindowStartTs(Long endTs) {
+        Long res = null;
+        if (endTs != null && this.windowConfig.getWindowLengthDurationMs() != null) {
+            res = endTs - this.windowConfig.getWindowLengthDurationMs();
+        }
+        return res;
+    }
+
+    private void start() {
+        if (this.waterMarkEventGenerator != null) {
+            log.debug("Starting waterMarkEventGenerator");
+            this.waterMarkEventGenerator.start();
+        }
+
+        log.debug("Starting trigger policy");
+        this.windowManager.triggerPolicy.start();
+    }
+
+    public void shutdown() {
+        if (this.waterMarkEventGenerator != null) {
+            this.waterMarkEventGenerator.shutdown();
+        }
+        if (this.windowManager != null) {
+            this.windowManager.shutdown();
+        }
+    }
+
+    private boolean isEventTime() {
+        return this.timestampExtractor != null;
+    }
+
+    @Override
+    public O process(I input, Context context) throws Exception {
+        if (!this.initialized) {
+            initialize(context);
+        }
+        if (isEventTime()) {
+            long ts = this.timestampExtractor.extractTimestamp(input);
+            if (this.waterMarkEventGenerator.track(context.getCurrentMessageTopicName(), ts)) {
+                this.windowManager.add(input, ts, context.getMessageId());
+            } else {
+                if (this.windowConfig.getLateDataTopic() != null) {
+                    context.publish(this.windowConfig.getLateDataTopic(), input, context.getOutputSerdeClassName());
+                } else {
+                    log.info(String.format(
+                            "Received a late tuple %s with ts %d. This will not be " + "processed"
+                                    + ".", input, ts));
+                }
+                context.ack(context.getMessageId());
+            }
+        } else {
+            this.windowManager.add(input, System.currentTimeMillis(), context.getMessageId());
+        }
+        return null;
+    }
+
+    public O process(Window<I> inputWindow, WindowContext context) throws Exception {
+        return this.windowFunction.apply(inputWindow.get());
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowImpl.java
new file mode 100644
index 0000000000..de9f09cebe
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import java.util.List;
+
+/**
+ * Holds the expired, new and current events in a window.
+ */
+public class WindowImpl<T> implements Window<T> {
+    private final List<T> tuples;
+    private final List<T> newTuples;
+    private final List<T> expiredTuples;
+    private final Long startTimestamp;
+    private final Long endTimestamp;
+
+    public WindowImpl(List<T> tuples, List<T> newTuples, List<T> expiredTuples,
+                      Long startTimestamp, Long endTimestamp) {
+        this.tuples = tuples;
+        this.newTuples = newTuples;
+        this.expiredTuples = expiredTuples;
+        this.startTimestamp = startTimestamp;
+        this.endTimestamp = endTimestamp;
+    }
+
+    @Override
+    public List<T> get() {
+        return tuples;
+    }
+
+    @Override
+    public List<T> getNew() {
+        return newTuples;
+    }
+
+    @Override
+    public List<T> getExpired() {
+        return expiredTuples;
+    }
+
+    @Override
+    public Long getStartTimestamp() {
+        return startTimestamp;
+    }
+
+    @Override
+    public Long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "TupleWindowImpl{" + "tuples=" + tuples + ", newTuples=" + newTuples + ", expiredTuples="
+                + expiredTuples + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        WindowImpl that = (WindowImpl) o;
+
+        if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) {
+            return false;
+        }
+        if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) {
+            return false;
+        }
+        return expiredTuples != null ? expiredTuples.equals(that.expiredTuples)
+                : that.expiredTuples == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tuples != null ? tuples.hashCode() : 0;
+        result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0);
+        result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0);
+        return result;
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowLifecycleListener.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowLifecycleListener.java
new file mode 100644
index 0000000000..e22e135a83
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowLifecycleListener.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import java.util.List;
+
+/**
+ * A callback for expiry, activation of events tracked by the {@link WindowManager}
+ *
+ * @param <T> The type of Event in the window (e.g. Tuple).
+ */
+public interface WindowLifecycleListener<T> {
+    /**
+     * Called on expiry of events from the window due to {@link EvictionPolicy}
+     *
+     * @param events the expired events
+     */
+    void onExpiry(List<T> events);
+
+    /**
+     * Called on activation of the window due to the {@link TriggerPolicy}
+     *
+     * @param events the list of current events in the window.
+     * @param newEvents the newly added events since last activation.
+     * @param expired the expired events since last activation.
+     * @param referenceTime the reference (event or processing) time that resulted in activation
+     */
+    default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long
+            referenceTime) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
new file mode 100644
index 0000000000..f121e13344
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
+
+/**
+ * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
+ * on expiry of events or activation of the window due to {@link TriggerPolicy}.
+ *
+ * @param <T> the type of event in the window.
+ */
+@Slf4j
+public class WindowManager<T> implements TriggerHandler {
+
+    /**
+     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
+     * keep the window size in check.
+     * <p>
+     * Note that if the eviction policy is based on watermarks, events will not be evicted until a new
+     * watermark would cause them to be considered expired anyway, regardless of this limit
+     */
+    protected static final int EXPIRE_EVENTS_THRESHOLD = 100;
+
+    protected final Collection<Event<T>> queue;
+    protected EvictionPolicy<T, ?> evictionPolicy;
+    protected TriggerPolicy<T, ?> triggerPolicy;
+    protected final WindowLifecycleListener<Event<T>> windowLifecycleListener;
+    private final List<Event<T>> expiredEvents;
+    private final Set<Event<T>> prevWindowEvents;
+    private final AtomicInteger eventsSinceLastExpiry;
+    private final ReentrantLock lock;
+
+    /**
+     * Constructs a {@link WindowManager}
+     *
+     * @param lifecycleListener the {@link WindowLifecycleListener}
+     * @param queue a collection where the events in the window can be enqueued.
+     * <br/>
+     * <b>Note:</b> This collection has to be thread safe.
+     */
+    public WindowManager(WindowLifecycleListener<Event<T>> lifecycleListener, Collection<Event<T>> queue) {
+        windowLifecycleListener = lifecycleListener;
+        this.queue = queue;
+        expiredEvents = new ArrayList<>();
+        prevWindowEvents = new HashSet<>();
+        eventsSinceLastExpiry = new AtomicInteger();
+        lock = new ReentrantLock(true);
+    }
+
+    public void setEvictionPolicy(EvictionPolicy<T, ?> evictionPolicy) {
+        this.evictionPolicy = evictionPolicy;
+    }
+
+    public void setTriggerPolicy(TriggerPolicy<T, ?> triggerPolicy) {
+        this.triggerPolicy = triggerPolicy;
+    }
+
+    /**
+     * Add an event into the window, with the given ts as the tracking ts.
+     *  @param event the event to track
+     * @param ts the timestamp
+     */
+    public void add(T event, long ts, byte[] messageId) {
+        add(new EventImpl<>(event, ts, messageId));
+    }
+
+    /**
+     * Tracks a window event
+     *
+     * @param windowEvent the window event to track
+     */
+    public void add(Event<T> windowEvent) {
+        // watermark events are not added to the queue.
+        if (windowEvent.isWatermark()) {
+            log.debug(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
+        } else {
+            queue.add(windowEvent);
+        }
+        track(windowEvent);
+        compactWindow();
+    }
+
+    /**
+     * The callback invoked by the trigger policy.
+     */
+    @Override
+    public boolean onTrigger() {
+        List<Event<T>> windowEvents = null;
+        List<Event<T>> expired = null;
+
+        try {
+            lock.lock();
+    /*
+     * scan the entire window to handle out of order events in
+     * the case of time based windows.
+     */
+            windowEvents = scanEvents(true);
+            expired = new ArrayList<>(expiredEvents);
+            expiredEvents.clear();
+        } finally {
+            lock.unlock();
+        }
+
+        List<Event<T>> events = new ArrayList<>();
+        List<Event<T>> newEvents = new ArrayList<>();
+        for (Event<T> event : windowEvents) {
+            events.add(event);
+            if (!prevWindowEvents.contains(event)) {
+                newEvents.add(event);
+            }
+        }
+        prevWindowEvents.clear();
+        if (!events.isEmpty()) {
+            prevWindowEvents.addAll(windowEvents);
+            log.debug(String.format("invoking windowLifecycleListener onActivation, [%d] events in "
+                    + "window.", events.size()));
+            windowLifecycleListener.onActivation(events, newEvents, expired,
+                    evictionPolicy.getContext().getReferenceTime());
+        } else {
+            log.debug("No events in the window, skipping onActivation");
+        }
+        triggerPolicy.reset();
+        return !events.isEmpty();
+    }
+
+    public void shutdown() {
+        log.debug("Shutting down WindowManager");
+        if (triggerPolicy != null) {
+            triggerPolicy.shutdown();
+        }
+    }
+
+    /**
+     * expires events that fall out of the window every
+     * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
+     * too big.
+     */
+    protected void compactWindow() {
+        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
+            scanEvents(false);
+        }
+    }
+
+    /**
+     * feed the event to the eviction and trigger policies
+     * for bookkeeping and optionally firing the trigger.
+     */
+    private void track(Event<T> windowEvent) {
+        evictionPolicy.track(windowEvent);
+        triggerPolicy.track(windowEvent);
+    }
+
+    /**
+     * Scan events in the queue, using the expiration policy to check
+     * if the event should be evicted or not.
+     *
+     * @param fullScan if set, will scan the entire queue; if not set, will stop
+     * as soon as an event not satisfying the expiration policy is found
+     * @return the list of events to be processed as a part of the current window
+     */
+    private List<Event<T>> scanEvents(boolean fullScan) {
+        log.debug(String.format("Scan events, eviction policy %s", evictionPolicy));
+        List<Event<T>> eventsToExpire = new ArrayList<>();
+        List<Event<T>> eventsToProcess = new ArrayList<>();
+
+        try {
+            lock.lock();
+            Iterator<Event<T>> it = queue.iterator();
+            while (it.hasNext()) {
+                Event<T> windowEvent = it.next();
+                EvictionPolicy.Action action = evictionPolicy.evict(windowEvent);
+                if (action == EXPIRE) {
+                    eventsToExpire.add(windowEvent);
+                    it.remove();
+                } else if (!fullScan || action == STOP) {
+                    break;
+                } else if (action == PROCESS) {
+                    eventsToProcess.add(windowEvent);
+                }
+            }
+            expiredEvents.addAll(eventsToExpire);
+        } finally {
+            lock.unlock();
+        }
+        eventsSinceLastExpiry.set(0);
+        log.debug(String.format("[%d] events expired from window.", eventsToExpire.size()));
+        if (!eventsToExpire.isEmpty()) {
+            log.debug("invoking windowLifecycleListener.onExpiry");
+            windowLifecycleListener.onExpiry(eventsToExpire);
+        }
+        return eventsToProcess;
+    }
+
+    /**
+     * Scans the event queue and returns the next earliest event ts
+     * between the startTs and endTs
+     *
+     * @param startTs the start ts (exclusive)
+     * @param endTs the end ts (inclusive)
+     * @return the earliest event ts between startTs and endTs
+     */
+    public long getEarliestEventTs(long startTs, long endTs) {
+        long minTs = Long.MAX_VALUE;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                minTs = Math.min(minTs, event.getTimestamp());
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Scans the event queue and returns number of events having
+     * timestamp less than or equal to the reference time.
+     *
+     * @param referenceTime the reference timestamp in millis
+     * @return the count of events with timestamp less than or equal to referenceTime
+     */
+    public int getEventCount(long referenceTime) {
+        int count = 0;
+        for (Event<T> event : queue) {
+            if (event.getTimestamp() <= referenceTime) {
+                ++count;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Scans the event queue and returns the list of event ts
+     * falling between startTs (exclusive) and endTs (inclusive)
+     * at each sliding interval counts.
+     *
+     * @param startTs the start timestamp (exclusive)
+     * @param endTs the end timestamp (inclusive)
+     * @param slidingCount the sliding interval count
+     * @return the list of event ts
+     */
+    public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
+        List<Long> timestamps = new ArrayList<>();
+        if (endTs > startTs) {
+            int count = 0;
+            long ts = Long.MIN_VALUE;
+            for (Event<T> event : queue) {
+                if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
+                    ts = Math.max(ts, event.getTimestamp());
+                    if (++count % slidingCount == 0) {
+                        timestamps.add(ts);
+                    }
+                }
+            }
+        }
+        return timestamps;
+    }
+
+    @Override
+    public String toString() {
+        return "WindowManager{" + "evictionPolicy=" + evictionPolicy + ", triggerPolicy="
+                + triggerPolicy + '}';
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
new file mode 100644
index 0000000000..6de61c2abb
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowUtils.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import org.apache.pulsar.functions.utils.WindowConfig;
+
+public class WindowUtils {
+    public static String getFullyQualifiedName(String tenant, String namespace, String name) {
+        return String.format("%s/%s/%s", tenant, namespace, name);
+    }
+
+    public static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) {
+        if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
+            throw new IllegalArgumentException("Window length is not specified");
+        }
+
+        if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
+            throw new IllegalArgumentException(
+                    "Window length for time and count are set! Please set one or the other.");
+        }
+
+        if (windowConfig.getWindowLengthCount() != null) {
+            if (windowConfig.getWindowLengthCount() <= 0) {
+                throw new IllegalArgumentException(
+                        "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
+            }
+        }
+
+        if (windowConfig.getWindowLengthDurationMs() != null) {
+            if (windowConfig.getWindowLengthDurationMs() <= 0) {
+                throw new IllegalArgumentException(
+                        "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
+            }
+        }
+
+        if (windowConfig.getSlidingIntervalCount() != null) {
+            if (windowConfig.getSlidingIntervalCount() <= 0) {
+                throw new IllegalArgumentException(
+                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
+            }
+        }
+
+        if (windowConfig.getSlidingIntervalDurationMs() != null) {
+            if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
+                throw new IllegalArgumentException(
+                        "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
+            }
+        }
+
+        if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
+            windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
+        }
+
+        if (windowConfig.getWindowLengthCount() != null && windowConfig.getSlidingIntervalCount() == null) {
+            windowConfig.setSlidingIntervalCount(windowConfig.getWindowLengthCount());
+        }
+
+        if (windowConfig.getTimestampExtractorClassName() != null) {
+            if (windowConfig.getMaxLagMs() != null) {
+                if (windowConfig.getMaxLagMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
+                }
+            } else {
+                windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
+            }
+            if (windowConfig.getWatermarkEmitIntervalMs() != null) {
+                if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
+                    throw new IllegalArgumentException(
+                            "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
+                }
+            } else {
+                windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/CountEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/CountEvictionPolicy.java
new file mode 100644
index 0000000000..78b4719847
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/CountEvictionPolicy.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.evictors;
+
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionContext;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An eviction policy that tracks event counts and can
+ * evict based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class CountEvictionPolicy<T> implements EvictionPolicy<T, Long> {
+    protected final int threshold;
+    protected final AtomicLong currentCount;
+    private EvictionContext context;
+
+    public CountEvictionPolicy(int count) {
+        this.threshold = count;
+        this.currentCount = new AtomicLong();
+    }
+
+    @Override
+    public Action evict(Event<T> event) {
+        /*
+         * atomically decrement the count if its greater than threshold and
+         * return if the event should be evicted
+         */
+        while (true) {
+            long curVal = currentCount.get();
+            if (curVal > threshold) {
+                if (currentCount.compareAndSet(curVal, curVal - 1)) {
+                    return Action.EXPIRE;
+                }
+            } else {
+                break;
+            }
+        }
+        return Action.PROCESS;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (!event.isWatermark()) {
+            currentCount.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void setContext(EvictionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public EvictionContext getContext() {
+        return context;
+    }
+
+    @Override
+    public String toString() {
+        return "CountEvictionPolicy{" + "threshold=" + threshold + ", currentCount=" + currentCount
+                + '}';
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public Long getState() {
+        return currentCount.get();
+    }
+
+    @Override
+    public void restoreState(Long state) {
+        currentCount.set(state);
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
new file mode 100644
index 0000000000..147ca5d314
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.evictors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionContext;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+
+/**
+ * Eviction policy that evicts events based on time duration.
+ */
+@Slf4j
+public class TimeEvictionPolicy<T> implements EvictionPolicy<T, EvictionContext> {
+
+    private final long windowLength;
+    protected volatile EvictionContext evictionContext;
+    private long delta;
+
+    /**
+     * Constructs a TimeEvictionPolicy that evicts events older
+     * than the given window length in millis
+     *
+     * @param windowLength the duration in milliseconds
+     */
+    public TimeEvictionPolicy(long windowLength) {
+        this.windowLength = windowLength;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public EvictionPolicy.Action evict(Event<T> event) {
+        long now =
+                evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
+        long diff = now - event.getTimestamp();
+        if (diff >= (windowLength + delta)) {
+            return EvictionPolicy.Action.EXPIRE;
+        } else if (diff < 0) { // do not process events beyond current ts
+            return Action.KEEP;
+        }
+        return Action.PROCESS;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // NOOP
+    }
+
+    @Override
+    public void setContext(EvictionContext context) {
+        EvictionContext prevContext = evictionContext;
+        evictionContext = context;
+        // compute window length adjustment (delta) to account for time drift
+        if (context.getSlidingInterval() != null) {
+            if (prevContext == null) {
+                delta = Integer.MAX_VALUE; // consider all events for the initial window
+            } else {
+                delta = context.getReferenceTime() - prevContext.getReferenceTime()
+                        - context.getSlidingInterval();
+                if (Math.abs(delta) > 100) {
+                    log.warn(String.format("Possible clock drift or long running computation in window; "
+                            + "Previous eviction time: %s, current eviction time: %s", prevContext
+                            .getReferenceTime(), context.getReferenceTime()));
+                }
+            }
+        }
+    }
+
+    @Override
+    public EvictionContext getContext() {
+        return evictionContext;
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public EvictionContext getState() {
+        return evictionContext;
+    }
+
+    @Override
+    public void restoreState(EvictionContext state) {
+        this.evictionContext = state;
+    }
+
+    @Override
+    public String toString() {
+        return "TimeEvictionPolicy{" + "windowLength=" + windowLength + ", evictionContext="
+                + evictionContext + '}';
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
new file mode 100644
index 0000000000..9c052efc2f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkCountEvictionPolicy.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.evictors;
+
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionContext;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An eviction policy that tracks count based on watermark ts and
+ * evicts events up to the watermark based on a threshold count.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountEvictionPolicy<T>
+        implements EvictionPolicy<T, Pair<Long, Long>> {
+    protected final int threshold;
+    protected final AtomicLong currentCount;
+    private EvictionContext context;
+
+    private volatile long processed;
+
+    public WatermarkCountEvictionPolicy(int count) {
+        threshold = count;
+        currentCount = new AtomicLong();
+    }
+
+    public EvictionPolicy.Action evict(Event<T> event) {
+        if (getContext() == null) {
+            //It is possible to get asked about eviction before we have a context, due to WindowManager
+            // .compactWindow.
+            //In this case we should hold on to all the events. When the first watermark is received,
+            // the context will be set,
+            //and the events will be reevaluated for eviction
+            return Action.STOP;
+        }
+
+        Action action;
+        if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
+            action = doEvict(event);
+            if (action == Action.PROCESS) {
+                ++processed;
+            }
+        } else {
+            action = Action.KEEP;
+        }
+        return action;
+    }
+
+    private Action doEvict(Event<T> event) {
+        /*
+         * atomically decrement the count if its greater than threshold and
+         * return if the event should be evicted
+         */
+        while (true) {
+            long curVal = currentCount.get();
+            if (curVal > threshold) {
+                if (currentCount.compareAndSet(curVal, curVal - 1)) {
+                    return Action.EXPIRE;
+                }
+            } else {
+                break;
+            }
+        }
+        return Action.PROCESS;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        // NOOP
+    }
+
+    @Override
+    public EvictionContext getContext() {
+        return context;
+    }
+
+    @Override
+    public void setContext(EvictionContext context) {
+        this.context = context;
+        if (context.getCurrentCount() != null) {
+            currentCount.set(context.getCurrentCount());
+        } else {
+            currentCount.set(processed + context.getSlidingCount());
+        }
+        processed = 0;
+    }
+
+    @Override
+    public void reset() {
+        processed = 0;
+    }
+
+    @Override
+    public Pair<Long, Long> getState() {
+        return Pair.of(currentCount.get(), processed);
+    }
+
+    @Override
+    public void restoreState(Pair<Long, Long> state) {
+        currentCount.set(state.getLeft());
+        processed = state.getRight();
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountEvictionPolicy{" + "} " + super.toString();
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkTimeEvictionPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkTimeEvictionPolicy.java
new file mode 100644
index 0000000000..47e6d66a41
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/evictors/WatermarkTimeEvictionPolicy.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.evictors;
+
+import org.apache.pulsar.functions.windowing.Event;
+
+/**
+ * An eviction policy that evicts events based on time duration taking
+ * watermark time and event lag into account.
+ */
+public class WatermarkTimeEvictionPolicy<T> extends TimeEvictionPolicy<T> {
+    private final long lag;
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis.
+     *
+     * @param windowLength the window length in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(long windowLength) {
+        this(windowLength, Long.MAX_VALUE);
+    }
+
+    /**
+     * Constructs a WatermarkTimeEvictionPolicy that evicts events older
+     * than the given window length in millis. The lag parameter
+     * can be used in the case of event based ts to break the queue
+     * scan early.
+     *
+     * @param windowLength the window length in milliseconds
+     * @param lag the max event lag in milliseconds
+     */
+    public WatermarkTimeEvictionPolicy(long windowLength, long lag) {
+        super(windowLength);
+        this.lag = lag;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Keeps events with future ts in the queue for processing in the next
+     * window. If the ts difference is more than the lag, stops scanning
+     * the queue for the current window.
+     */
+    @Override
+    public Action evict(Event<T> event) {
+        if (evictionContext == null) {
+            //It is possible to get asked about eviction before we have a context, due to WindowManager
+            // .compactWindow.
+            //In this case we should hold on to all the events. When the first watermark is received,
+            // the context will be set,
+            //and the events will be reevaluated for eviction
+            return Action.STOP;
+        }
+
+        long referenceTime = evictionContext.getReferenceTime();
+        long diff = referenceTime - event.getTimestamp();
+        if (diff < -lag) {
+            return Action.STOP;
+        } else if (diff < 0) {
+            return Action.KEEP;
+        } else {
+            return super.evict(event);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkTimeEvictionPolicy{" + "lag=" + lag + "} " + super.toString();
+    }
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/CountTriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/CountTriggerPolicy.java
new file mode 100644
index 0000000000..52a6b310c4
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/CountTriggerPolicy.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.triggers;
+
+import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+import org.apache.pulsar.functions.windowing.TriggerHandler;
+import org.apache.pulsar.functions.windowing.TriggerPolicy;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()}
+ * when the count threshold is hit.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class CountTriggerPolicy<T> implements TriggerPolicy<T, Integer> {
+    private final int count;
+    private final AtomicInteger currentCount;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T, ?> evictionPolicy;
+    private boolean started;
+
+    public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T, ?>
+            evictionPolicy) {
+        this.count = count;
+        this.currentCount = new AtomicInteger();
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.started = false;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (started && !event.isWatermark()) {
+            if (currentCount.incrementAndGet() >= count) {
+                evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
+                handler.onTrigger();
+            }
+        }
+    }
+
+    @Override
+    public void reset() {
+        currentCount.set(0);
+    }
+
+    @Override
+    public void start() {
+        started = true;
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    @Override
+    public Integer getState() {
+        return currentCount.get();
+    }
+
+    @Override
+    public void restoreState(Integer state) {
+        currentCount.set(state);
+    }
+
+    @Override
+    public String toString() {
+        return "CountTriggerPolicy{" + "count=" + count + ", currentCount=" + currentCount
+                + ", started=" + started + '}';
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/TimeTriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/TimeTriggerPolicy.java
new file mode 100644
index 0000000000..bc40f5329e
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/TimeTriggerPolicy.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.triggers;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.ThreadContext;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+import org.apache.pulsar.functions.windowing.TriggerHandler;
+import org.apache.pulsar.functions.windowing.TriggerPolicy;
+import org.apache.pulsar.functions.windowing.WindowUtils;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Invokes {@link TriggerHandler#onTrigger()} after the duration.
+ */
+
+@Slf4j
+public class TimeTriggerPolicy<T> implements TriggerPolicy<T, Void> {
+
+    private long duration;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T, ?> evictionPolicy;
+    private ScheduledFuture<?> executorFuture;
+    private final ScheduledExecutorService executor;
+    private Context context;
+
+    public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy<T, ?>
+            evictionPolicy, Context context) {
+        this.duration = millis;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("time-trigger-policy-%d")
+                .setDaemon(true)
+                .build();
+        this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+        this.context = context;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        checkFailures();
+    }
+
+    @Override
+    public void reset() {
+
+    }
+
+    @Override
+    public void start() {
+        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void shutdown() {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException ie) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "TimeTriggerPolicy{" + "duration=" + duration + '}';
+    }
+
+    private Runnable newTriggerTask() {
+        return new Runnable() {
+            @Override
+            public void run() {
+                // initialize the thread context
+                ThreadContext.put("function", WindowUtils.getFullyQualifiedName(
+                        context.getTenant(), context.getNamespace(), context.getFunctionName()));
+                // do not process current timestamp since tuples might arrive while the trigger is executing
+                long now = System.currentTimeMillis() - 1;
+                try {
+                    /*
+                     * set the current timestamp as the reference time for the eviction policy
+                     * to evict the events
+                     */
+                    evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
+                    handler.onTrigger();
+                } catch (Throwable th) {
+                    log.error("handler.onTrigger failed ", th);
+                    /*
+                     * propagate it so that task gets canceled and the exception
+                     * can be retrieved from executorFuture.get()
+                     */
+                    throw th;
+                }
+            }
+        };
+    }
+
+    private void checkFailures() {
+        if (executorFuture != null && executorFuture.isDone()) {
+            try {
+                executorFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Got exception in timer trigger policy ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public Void getState() {
+        return null;
+    }
+
+    @Override
+    public void restoreState(Void state) {
+
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkCountTriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkCountTriggerPolicy.java
new file mode 100644
index 0000000000..26a9ce7379
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkCountTriggerPolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.triggers;
+
+import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+import org.apache.pulsar.functions.windowing.TriggerHandler;
+import org.apache.pulsar.functions.windowing.TriggerPolicy;
+import org.apache.pulsar.functions.windowing.WindowManager;
+
+import java.util.List;
+
+/**
+ * A trigger policy that tracks event counts and sets the context for
+ * eviction policy to evict based on latest watermark time.
+ *
+ * @param <T> the type of event tracked by this policy.
+ */
+public class WatermarkCountTriggerPolicy<T> implements TriggerPolicy<T, Long> {
+    private final int count;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T, ?> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private volatile long lastProcessedTs;
+    private boolean started;
+
+    public WatermarkCountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy<T, ?>
+            evictionPolicy, WindowManager<T> windowManager) {
+        this.count = count;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+        this.started = false;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (started && event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void start() {
+        started = true;
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Triggers all the pending windows up to the waterMarkEvent timestamp
+     * based on the sliding interval count.
+     *
+     * @param waterMarkEvent the watermark event
+     */
+    private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
+        long watermarkTs = waterMarkEvent.getTimestamp();
+        List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs,
+                count);
+        for (long ts : eventTs) {
+            evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
+            handler.onTrigger();
+            lastProcessedTs = ts;
+        }
+    }
+
+    @Override
+    public Long getState() {
+        return lastProcessedTs;
+    }
+
+    @Override
+    public void restoreState(Long state) {
+        lastProcessedTs = state;
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkCountTriggerPolicy{" + "count=" + count + ", lastProcessedTs="
+                + lastProcessedTs + ", started=" + started + '}';
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java
new file mode 100644
index 0000000000..22722bf887
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing.triggers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
+import org.apache.pulsar.functions.windowing.Event;
+import org.apache.pulsar.functions.windowing.EvictionPolicy;
+import org.apache.pulsar.functions.windowing.TriggerHandler;
+import org.apache.pulsar.functions.windowing.TriggerPolicy;
+import org.apache.pulsar.functions.windowing.WindowManager;
+
+/**
+ * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window
+ * interval that has events to be processed up to the watermark ts.
+ */
+@Slf4j
+public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T, Long> {
+    private final long slidingIntervalMs;
+    private final TriggerHandler handler;
+    private final EvictionPolicy<T, ?> evictionPolicy;
+    private final WindowManager<T> windowManager;
+    private volatile long nextWindowEndTs;
+    private boolean started;
+
+    public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler,
+                                      EvictionPolicy<T, ?> evictionPolicy, WindowManager<T>
+                                              windowManager) {
+        this.slidingIntervalMs = slidingIntervalMs;
+        this.handler = handler;
+        this.evictionPolicy = evictionPolicy;
+        this.windowManager = windowManager;
+        this.started = false;
+    }
+
+    @Override
+    public void track(Event<T> event) {
+        if (started && event.isWatermark()) {
+            handleWaterMarkEvent(event);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // NOOP
+    }
+
+    @Override
+    public void start() {
+        started = true;
+    }
+
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+
+    /**
+     * Invokes the trigger all pending windows up to the
+     * watermark timestamp. The end ts of the window is set
+     * in the eviction policy context so that the events falling
+     * within that window can be processed.
+     */
+    private void handleWaterMarkEvent(Event<T> event) {
+        long watermarkTs = event.getTimestamp();
+        long windowEndTs = nextWindowEndTs;
+        log.debug(String.format("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs));
+        while (windowEndTs <= watermarkTs) {
+            long currentCount = windowManager.getEventCount(windowEndTs);
+            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
+            if (handler.onTrigger()) {
+                windowEndTs += slidingIntervalMs;
+            } else {
+                /*
+                 * No events were found in the previous window interval.
+                 * Scan through the events in the queue to find the next
+                 * window intervals based on event ts.
+                 */
+                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
+                log.debug(String.format("Next aligned window end ts %d", ts));
+                if (ts == Long.MAX_VALUE) {
+                    log.debug(String.format("No events to process between %d and watermark ts %d",
+                            windowEndTs, watermarkTs));
+                    break;
+                }
+                windowEndTs = ts;
+            }
+        }
+        nextWindowEndTs = windowEndTs;
+    }
+
+    /**
+     * Computes the next window by scanning the events in the window and
+     * finds the next aligned window between the startTs and endTs. Return the end ts
+     * of the next aligned window, i.e. the ts when the window should fire.
+     *
+     * @param startTs the start timestamp (excluding)
+     * @param endTs the end timestamp (including)
+     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there
+     * are no more events to be processed.
+     */
+    private long getNextAlignedWindowTs(long startTs, long endTs) {
+        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
+        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
+            return nextTs;
+        }
+        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
+    }
+
+    @Override
+    public Long getState() {
+        return nextWindowEndTs;
+    }
+
+    @Override
+    public void restoreState(Long state) {
+        nextWindowEndTs = state;
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkTimeTriggerPolicy{" + "slidingIntervalMs=" + slidingIntervalMs
+                + ", nextWindowEndTs=" + nextWindowEndTs + ", started=" + started + '}';
+    }
+}
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d1a249620b..e6c8cd8e8c 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07 \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t \x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\xf1\x01\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"U\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\r\n\x05topic\x18\x04 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x05 \x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"\x95\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61utoAck\x18\t \x01(\x08\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\"\x88\x02\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12M\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntry\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"l\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\xa1\x01\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02*,\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1180,
-  serialized_end=1259,
+  serialized_start=1225,
+  serialized_end=1304,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=1261,
-  serialized_end=1305,
+  serialized_start=1306,
+  serialized_end=1350,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -116,30 +116,37 @@
   ],
   containing_type=None,
   options=None,
-  serialized_start=454,
-  serialized_end=485,
+  serialized_start=453,
+  serialized_end=484,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
 
-_FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
-  name='UserConfigEntry',
-  full_name='proto.FunctionDetails.UserConfigEntry',
+_RESOURCES = _descriptor.Descriptor(
+  name='Resources',
+  full_name='proto.Resources',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='proto.FunctionDetails.UserConfigEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      name='cpu', full_name='proto.Resources.cpu', index=0,
+      number=1, type=1, cpp_type=5, label=1,
+      has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='value', full_name='proto.FunctionDetails.UserConfigEntry.value', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      name='ram', full_name='proto.Resources.ram', index=1,
+      number=2, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='disk', full_name='proto.Resources.disk', index=2,
+      number=3, type=3, cpp_type=2, label=1,
+      has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
@@ -149,16 +156,17 @@
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=403,
-  serialized_end=452,
+  serialized_start=25,
+  serialized_end=76,
 )
 
+
 _FUNCTIONDETAILS = _descriptor.Descriptor(
   name='FunctionDetails',
   full_name='proto.FunctionDetails',
@@ -210,8 +218,8 @@
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
-      number=7, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
+      number=7, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
@@ -250,10 +258,17 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='resources', full_name='proto.FunctionDetails.resources', index=12,
+      number=13, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
-  nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ],
+  nested_types=[],
   enum_types=[
     _FUNCTIONDETAILS_RUNTIME,
   ],
@@ -263,8 +278,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=26,
-  serialized_end=485,
+  serialized_start=79,
+  serialized_end=484,
 )
 
 
@@ -301,8 +316,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=668,
-  serialized_end=729,
+  serialized_start=690,
+  serialized_end=751,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -327,14 +342,21 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=2,
+      name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3,
       number=3, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=3,
+      name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
       number=4, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -352,8 +374,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=488,
-  serialized_end=729,
+  serialized_start=487,
+  serialized_end=751,
 )
 
 
@@ -379,15 +401,22 @@
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='topic', full_name='proto.SinkSpec.topic', index=2,
-      number=4, type=9, cpp_type=9, label=1,
+      name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
+      number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3,
-      number=5, type=9, cpp_type=9, label=1,
+      name='topic', full_name='proto.SinkSpec.topic', index=3,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
+      number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -404,8 +433,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=731,
-  serialized_end=816,
+  serialized_start=753,
+  serialized_end=861,
 )
 
 
@@ -435,8 +464,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=818,
-  serialized_end=864,
+  serialized_start=863,
+  serialized_end=909,
 )
 
 
@@ -487,8 +516,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=867,
-  serialized_end=1028,
+  serialized_start=912,
+  serialized_end=1073,
 )
 
 
@@ -525,8 +554,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1030,
-  serialized_end=1111,
+  serialized_start=1075,
+  serialized_end=1156,
 )
 
 
@@ -563,16 +592,15 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1113,
-  serialized_end=1178,
+  serialized_start=1158,
+  serialized_end=1223,
 )
 
-_FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
-_FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY
 _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
 _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
 _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
+_FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
 _SOURCESPEC.fields_by_name['subscriptionType'].enum_type = _SUBSCRIPTIONTYPE
@@ -581,6 +609,7 @@
 _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA
 _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
 _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
+DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
 DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
 DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
@@ -592,20 +621,19 @@
 DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
-FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict(
+Resources = _reflection.GeneratedProtocolMessageType('Resources', (_message.Message,), dict(
+  DESCRIPTOR = _RESOURCES,
+  __module__ = 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.Resources)
+  ))
+_sym_db.RegisterMessage(Resources)
 
-  UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict(
-    DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY,
-    __module__ = 'Function_pb2'
-    # @@protoc_insertion_point(class_scope:proto.FunctionDetails.UserConfigEntry)
-    ))
-  ,
+FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict(
   DESCRIPTOR = _FUNCTIONDETAILS,
   __module__ = 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionDetails)
   ))
 _sym_db.RegisterMessage(FunctionDetails)
-_sym_db.RegisterMessage(FunctionDetails.UserConfigEntry)
 
 SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict(
 
@@ -660,8 +688,6 @@
 
 DESCRIPTOR.has_options = True
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True
-_FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index a841152366..e17b296a49 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -25,6 +25,7 @@
 
 import time
 import os
+import json
 
 import pulsar
 import util
@@ -59,6 +60,7 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers)
     self.current_message_id = None
     self.current_input_topic_name = None
     self.current_start_time = None
+    self.user_config = json.loads(instance_config.function_details.userConfig);
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
@@ -94,13 +96,13 @@ def get_logger(self):
     return self.log
 
   def get_user_config_value(self, key):
-    if key in self.instance_config.function_details.userConfig:
-      return str(self.instance_config.function_details.userConfig[key])
+    if key in self.user_config:
+      return self.user_config[key]
     else:
       return None
   
   def get_user_config_map(self):
-    return self.instance_config.function_details.userConfig
+    return self.user_config
 
   def record_metric(self, metric_name, metric_value):
     if not metric_name in self.accumulated_metrics:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 15d264328a..0c9c5bf3f9 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -116,9 +116,7 @@ def main():
   else:
     function_details.autoAck = False
   if args.user_config != None and len(args.user_config) != 0:
-    user_config = json.loads(args.user_config)
-    for (key, value) in user_config.items():
-      function_details.userConfig[str(key)] = str(value)
+    function_details.userConfig = args.user_config
 
   pulsar_client = pulsar.Client(args.pulsar_serviceurl)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
new file mode 100644
index 0000000000..b06fd4b199
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.windowing;
+
+import org.apache.pulsar.functions.api.Context;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link WaterMarkEventGenerator}
+ */
+public class WaterMarkEventGeneratorTest {
+    private WaterMarkEventGenerator<Integer> waterMarkEventGenerator;
+    private WindowManager<Integer> windowManager;
+    private List<Event<Integer>> eventList = new ArrayList<>();
+    private Context context;
+
+    @BeforeMethod
+    public void setUp() {
+        windowManager = new WindowManager<Integer>(null, new LinkedList<>()) {
+            @Override
+            public void add(Event<Integer> event) {
+                eventList.add(event);
+            }
+        };
+
+        context = Mockito.mock(Context.class);
+        Mockito.doReturn("test-function").when(context).getFunctionName();
+        Mockito.doReturn("test-namespace").when(context).getNamespace();
+        Mockito.doReturn("test-tenant").when(context).getTenant();
+        // set watermark interval to a high value and trigger manually to fix timing issues
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L, 5, Collections
+                .singleton("s1"), context);
+//        waterMarkEventGenerator.start();
+    }
+
+    @AfterMethod
+    public void tearDown() {
+//        waterMarkEventGenerator.shutdown();
+        eventList.clear();
+    }
+
+    @Test
+    public void testTrackSingleStream() throws Exception {
+        waterMarkEventGenerator.track("s1", 100);
+        waterMarkEventGenerator.track("s1", 110);
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testTrackSingleStreamOutOfOrder() throws Exception {
+        waterMarkEventGenerator.track("s1", 100);
+        waterMarkEventGenerator.track("s1", 110);
+        waterMarkEventGenerator.track("s1", 104);
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testTrackTwoStreams() throws Exception {
+        Set<String> streams = new HashSet<>();
+        streams.add("s1");
+        streams.add("s2");
+        waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L,
+                5, streams, context);
+        waterMarkEventGenerator.start();
+
+        waterMarkEventGenerator.track("s1", 100);
+        waterMarkEventGenerator.track("s1", 110);
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.isEmpty());
+        waterMarkEventGenerator.track("s2", 95);
+        waterMarkEventGenerator.track("s2", 98);
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(93, eventList.get(0).getTimestamp());
+    }
+
+    @Test
+    public void testNoEvents() throws Exception {
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.isEmpty());
+    }
+
+    @Test
+    public void testLateEvent() throws Exception {
+        assertTrue(waterMarkEventGenerator.track("s1", 100));
+        assertTrue(waterMarkEventGenerator.track("s1", 110));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(105, eventList.get(0).getTimestamp());
+        eventList.clear();
+        assertTrue(waterMarkEventGenerator.track("s1", 105));
+        assertTrue(waterMarkEventGenerator.track("s1", 106));
+        assertTrue(waterMarkEventGenerator.track("s1", 115));
+        assertFalse(waterMarkEventGenerator.track("s1", 104));
+        waterMarkEventGenerator.run();
+        assertTrue(eventList.get(0).isWatermark());
+        assertEquals(110, eventList.get(0).getTimestamp());
+    }
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
new file mode 100644
index 0000000000..33565bc07d
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.windowing;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.WindowConfig;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+/**
+ * Unit tests for {@link WindowFunctionExecutor}
+ */
+@Slf4j
+public class WindowFunctionExecutorTest {
+
+    private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> {
+
+        List<Window<Long>> windows = new ArrayList<>();
+
+        @Override
+        public Long process(Window<Long> inputWindow, WindowContext context) throws Exception {
+            windows.add(inputWindow);
+            return null;
+        }
+    }
+
+    private static class TestFunction implements Function<Collection<Long>, Long> {
+
+        @Override
+        public Long apply(Collection<Long> longs) {
+            return null;
+        }
+    }
+
+    private static class TestWrongFunction implements Function<Long, Long> {
+
+        @Override
+        public Long apply(Long aLong) {
+            return null;
+        }
+    }
+
+    private static class TestTimestampExtractor implements TimestampExtractor<Long> {
+        @Override
+        public long extractTimestamp(Long input) {
+            return input;
+        }
+    }
+
+    private static class TestWrongTimestampExtractor implements TimestampExtractor<String> {
+        @Override
+        public long extractTimestamp(String input) {
+            return Long.valueOf(input);
+        }
+    }
+
+
+    private TestWindowFunctionExecutor testWindowedPulsarFunction;
+    private Context context;
+    private WindowConfig windowConfig;
+
+    @BeforeMethod
+    public void setUp() {
+        testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+        context = Mockito.mock(Context.class);
+        Mockito.doReturn("test-function").when(context).getFunctionName();
+        Mockito.doReturn("test-namespace").when(context).getNamespace();
+        Mockito.doReturn("test-tenant").when(context).getTenant();
+
+        windowConfig = new WindowConfig();
+        windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+        windowConfig.setWindowLengthDurationMs(20L);
+        windowConfig.setSlidingIntervalDurationMs(10L);
+        windowConfig.setMaxLagMs(5L);
+        // trigger manually to avoid timing issues
+        windowConfig.setWatermarkEmitIntervalMs(100000L);
+        windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+        Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+        Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+        Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+        Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+    }
+
+    @AfterMethod
+    public void cleanUp() {
+        testWindowedPulsarFunction.shutdown();
+    }
+
+    @Test(expectedExceptions = RuntimeException.class)
+    public void testExecuteWithWrongWrongTimestampExtractorType() throws Exception {
+        WindowConfig windowConfig = new WindowConfig();
+        windowConfig.setTimestampExtractorClassName(TestWrongTimestampExtractor.class.getName());
+        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+        testWindowedPulsarFunction.process(10L, context);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testExecuteWithWrongJavaWindowFunctionType() throws Exception {
+        WindowConfig windowConfig = new WindowConfig();
+        windowConfig.setActualWindowFunctionClassName(TestWrongFunction.class.getName());
+        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+        testWindowedPulsarFunction.process(10L, context);
+    }
+
+    @Test
+    public void testExecuteWithTs() throws Exception {
+        long[] timestamps = {603, 605, 607, 618, 626, 636};
+        for (long ts : timestamps) {
+            testWindowedPulsarFunction.process(ts, context);
+        }
+        testWindowedPulsarFunction.waterMarkEventGenerator.run();
+        assertEquals(3, testWindowedPulsarFunction.windows.size());
+        Window<Long> first = testWindowedPulsarFunction.windows.get(0);
+        assertArrayEquals(
+                new long[]{603, 605, 607},
+                new long[]{first.get().get(0), first.get().get(1), first.get().get(2)});
+
+        Window<Long> second = testWindowedPulsarFunction.windows.get(1);
+        assertArrayEquals(
+                new long[]{603, 605, 607, 618},
+                new long[]{second.get().get(0), second.get().get(1), second.get().get(2), second.get().get(3)});
+
+        Window<Long> third = testWindowedPulsarFunction.windows.get(2);
+        assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0), third.get().get(1)});
+    }
+
+    @Test
+    public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+        context = Mockito.mock(Context.class);
+        Mockito.doReturn("test-function").when(context).getFunctionName();
+        Mockito.doReturn("test-namespace").when(context).getNamespace();
+        Mockito.doReturn("test-tenant").when(context).getTenant();
+        Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+        Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+        Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+        Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+        WindowConfig windowConfig = new WindowConfig();
+        windowConfig.setWindowLengthDurationMs(20L);
+        windowConfig.setSlidingIntervalDurationMs(10L);
+        windowConfig.setLateDataTopic("$late");
+        windowConfig.setMaxLagMs(5L);
+        windowConfig.setWatermarkEmitIntervalMs(10L);
+        windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+        try {
+            testWindowedPulsarFunction.process(10L, context);
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "Late data topic can be defined only when specifying a "
+                    + "timestamp extractor class");
+        }
+    }
+
+    @Test
+    public void testExecuteWithLateTupleStream() throws Exception {
+
+        windowConfig.setLateDataTopic("$late");
+        Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+        long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
+        List<Long> events = new ArrayList<>(timestamps.length);
+
+        for (long ts : timestamps) {
+            events.add(ts);
+            testWindowedPulsarFunction.process(ts, context);
+
+            //Update the watermark to this timestamp
+            testWindowedPulsarFunction.waterMarkEventGenerator.run();
+        }
+        System.out.println(testWindowedPulsarFunction.windows);
+        long event = events.get(events.size() - 1);
+        Mockito.verify(context).publish("$late", event, DefaultSerDe.class.getName());
+    }
+
+    @Test
+    public void testSettingSlidingCountWindow() throws Exception {
+        final Object[][] args = new Object[][]{
+                {-1, 10},
+                {10, -1},
+                {0, 10},
+                {10, 0},
+                {0, 0},
+                {-1, -1},
+                {5, 10},
+                {1, 1},
+                {10, 5},
+                {100, 10},
+                {100, 100},
+                {200, 100},
+                {500, 100},
+                {null, null},
+                {null, 1},
+                {1, null},
+                {null, -1},
+                {-1, null}
+        };
+
+        for (Object[] arg : args) {
+            Object arg0 = arg[0];
+            Object arg1 = arg[1];
+            try {
+
+                Integer windowLengthCount = null;
+                if (arg0 != null) {
+                    windowLengthCount = (Integer) arg0;
+                }
+                Integer slidingIntervalCount = null;
+
+                if (arg1 != null) {
+                    slidingIntervalCount = (Integer) arg1;
+                }
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthCount(windowLengthCount);
+                windowConfig.setSlidingIntervalCount(slidingIntervalCount);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if ((Integer) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if (arg1 != null && (Integer) arg1 <= 0) {
+                    fail(String.format("Sliding interval length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthCount().intValue(),
+                        windowLengthCount.intValue());
+                // if slidingIntervalCount is null then its a tumbling windowing and slidingIntervalCount will be
+                // set to window length count
+                if (slidingIntervalCount == null) {
+                    Assert.assertEquals(
+                            testWindowedPulsarFunction.windowConfig.getSlidingIntervalCount().intValue(),
+                            windowLengthCount.intValue());
+                } else {
+                    Assert.assertEquals(
+                            testWindowedPulsarFunction.windowConfig.getSlidingIntervalCount().intValue(),
+                            slidingIntervalCount.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && arg1 != null && (Integer) arg0 > 0 && (Integer) arg1 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s "
+                            + "slidingIntervalCount: %s", e.getMessage(), arg0, arg1));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingSlidingTimeWindow() throws Exception {
+        final Object[][] args = new Object[][]{
+                {-1L, 10L},
+                {10L, -1L},
+                {0L, 10L},
+                {10L, 0L},
+                {0L, 0L},
+                {-1L, -1L},
+                {5L, 10L},
+                {1L, 1L},
+                {10L, 5L},
+                {100L, 10L},
+                {100L, 100L},
+                {200L, 100L},
+                {500L, 100L},
+                {null, null},
+                {null, 1L},
+                {1L, null},
+                {null, -1L},
+                {-1L, null}
+        };
+
+        for (Object[] arg : args) {
+            Object arg0 = arg[0];
+            Object arg1 = arg[1];
+            try {
+                Long windowLengthDuration = null;
+                if (arg0 != null) {
+                    windowLengthDuration = (Long) arg0;
+                }
+                Long slidingIntervalDuration = null;
+
+                if (arg1 != null) {
+                    slidingIntervalDuration = (Long) arg1;
+                }
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthDurationMs(windowLengthDuration);
+                windowConfig.setSlidingIntervalDurationMs(slidingIntervalDuration);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if ((Long) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+                if (arg1 != null && (Long) arg1 <= 0) {
+                    fail(String.format("Sliding interval length cannot be zero or less -- "
+                            + "windowLengthCount: %s slidingIntervalCount: %s", arg0, arg1));
+                }
+
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthDurationMs().longValue(),
+                        windowLengthDuration.longValue());
+                // if slidingIntervalDuration is null then its a tumbling windowing and slidingIntervalDuration will be
+                // set to window length duration
+                if (slidingIntervalDuration == null) {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getSlidingIntervalDurationMs().longValue(),
+                            windowLengthDuration.longValue());
+                } else {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getSlidingIntervalDurationMs().longValue(),
+                            slidingIntervalDuration.longValue());
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && arg1 != null && (Long) arg0 > 0 && (Long) arg1 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s "
+                            + "slidingIntervalDuration: %s", e.getMessage(), arg0, arg1));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingTumblingCountWindow() throws Exception {
+        final Object[] args = new Object[]{-1, 0, 1, 2, 5, 10, null};
+
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Integer windowLengthCount = null;
+                if (arg0 != null) {
+                    windowLengthCount = (Integer) arg0;
+                }
+
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthCount(windowLengthCount);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    fail(String.format("Window length cannot be null -- windowLengthCount: %s", arg0));
+                }
+                if ((Integer) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- windowLengthCount: %s",
+                            arg0));
+                }
+
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthCount().intValue(),
+                        windowLengthCount.intValue());
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthCount().intValue(),
+                        testWindowedPulsarFunction.windowConfig.getSlidingIntervalCount().intValue());
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Integer) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthCount: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingTumblingTimeWindow() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Long windowLengthDuration = null;
+                if (arg0 != null) {
+                    windowLengthDuration = (Long) arg0;
+                }
+
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthDurationMs(windowLengthDuration);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    fail(String.format("Window count duration cannot be null -- windowLengthDuration: %s",
+                            arg0));
+                }
+                if ((Long) arg0 <= 0) {
+                    fail(String.format("Window length cannot be zero or less -- windowLengthDuration: %s",
+                            arg0));
+                }
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthDurationMs().longValue(),
+                        windowLengthDuration.longValue());
+                Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWindowLengthDurationMs().longValue(),
+                        testWindowedPulsarFunction.windowConfig.getSlidingIntervalDurationMs().longValue());
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- windowLengthDuration: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingLagTime() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+
+                Long maxLagMs = null;
+                if (arg0 != null) {
+                    maxLagMs = (Long) arg0;
+                }
+
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthCount(1);
+                windowConfig.setSlidingIntervalCount(1);
+                windowConfig.setMaxLagMs(maxLagMs);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs(),
+                            new Long(testWindowedPulsarFunction.DEFAULT_MAX_LAG_MS));
+                } else if((Long) arg0 <= 0) {
+                    fail(String.format("Window lag cannot be zero or less -- lagTime: %s", arg0));
+                } else {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getMaxLagMs().longValue(),
+                            maxLagMs.longValue());
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- lagTime: %s",
+                            e.getMessage(), arg0));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSettingWaterMarkInterval() throws Exception {
+        final Object[] args = new Object[]{-1L, 0L, 1L, 2L, 5L, 10L, null};
+        for (Object arg : args) {
+            Object arg0 = arg;
+            try {
+                Long watermarkEmitInterval = null;
+                if (arg0 != null) {
+                    watermarkEmitInterval = (Long) arg0;
+                }
+
+                context = Mockito.mock(Context.class);
+                Mockito.doReturn("test-function").when(context).getFunctionName();
+                Mockito.doReturn("test-namespace").when(context).getNamespace();
+                Mockito.doReturn("test-tenant").when(context).getTenant();
+                Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
+                Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
+                Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
+                Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+
+                WindowConfig windowConfig = new WindowConfig();
+                windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
+                windowConfig.setWindowLengthCount(1);
+                windowConfig.setSlidingIntervalCount(1);
+                windowConfig.setWatermarkEmitIntervalMs(watermarkEmitInterval);
+                windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
+                Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class)))
+                        .when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+
+                testWindowedPulsarFunction = new TestWindowFunctionExecutor();
+                testWindowedPulsarFunction.process(10L, context);
+
+                if (arg0 == null) {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWatermarkEmitIntervalMs(),
+                            new Long(testWindowedPulsarFunction.DEFAULT_WATERMARK_EVENT_INTERVAL_MS));
+                } else if ((Long) arg0 <= 0) {
+                    fail(String.format("Watermark interval cannot be zero or less -- watermarkInterval: "
+                            + "%s", arg0));
+                } else {
+                    Assert.assertEquals(testWindowedPulsarFunction.windowConfig.getWatermarkEmitIntervalMs().longValue(),
+                            watermarkEmitInterval.longValue());
+                }
+            } catch (IllegalArgumentException e) {
+                if (arg0 != null && (Long) arg0 > 0) {
+                    fail(String.format("Exception: %s thrown on valid input -- watermarkInterval: %s", e
+                            .getMessage(), arg0));
+                }
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowManagerTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowManagerTest.java
new file mode 100644
index 0000000000..e4949cd662
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowManagerTest.java
@@ -0,0 +1,839 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.windowing;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.WatermarkCountEvictionPolicy;
+import org.apache.pulsar.functions.windowing.evictors.WatermarkTimeEvictionPolicy;
+import org.apache.pulsar.functions.windowing.triggers.CountTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
+import org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link WindowManager}
+ */
+@Slf4j
+public class WindowManagerTest {
+    private WindowManager<Integer> windowManager;
+    private Listener listener;
+
+    private static final long TIMESTAMP = 1516776194873L;
+    private static final String TOPIC = "test-topic";
+
+    private static class Listener implements WindowLifecycleListener<Event<Integer>> {
+        private List<Event<Integer>> onExpiryEvents = Collections.emptyList();
+        private List<Event<Integer>> onActivationEvents = Collections.emptyList();
+        private List<Event<Integer>> onActivationNewEvents = Collections.emptyList();
+        private List<Event<Integer>> onActivationExpiredEvents = Collections.emptyList();
+
+        // all events since last clear
+        private List<List<Event<Integer>>> allOnExpiryEvents = new ArrayList<>();
+        private List<List<Event<Integer>>> allOnActivationEvents = new ArrayList<>();
+        private List<List<Event<Integer>>> allOnActivationNewEvents = new ArrayList<>();
+        private List<List<Event<Integer>>> allOnActivationExpiredEvents = new ArrayList<>();
+
+        @Override
+        public void onExpiry(List<Event<Integer>> events) {
+            onExpiryEvents = events;
+            allOnExpiryEvents.add(events);
+        }
+
+        @Override
+        public void onActivation(List<Event<Integer>> events, List<Event<Integer>> newEvents, List<Event<Integer>>
+                expired, Long timestamp) {
+            onActivationEvents = events;
+            allOnActivationEvents.add(events);
+            onActivationNewEvents = newEvents;
+            allOnActivationNewEvents.add(newEvents);
+            onActivationExpiredEvents = expired;
+            allOnActivationExpiredEvents.add(expired);
+        }
+
+        void clear() {
+            onExpiryEvents = Collections.emptyList();
+            onActivationEvents = Collections.emptyList();
+            onActivationNewEvents = Collections.emptyList();
+            onActivationExpiredEvents = Collections.emptyList();
+
+            allOnExpiryEvents.clear();
+            allOnActivationEvents.clear();
+            allOnActivationNewEvents.clear();
+            allOnActivationExpiredEvents.clear();
+        }
+    }
+
+    @BeforeMethod
+    public void setUp() {
+        listener = new Listener();
+        windowManager = new WindowManager<>(listener, new LinkedList<>());
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        windowManager.shutdown();
+    }
+
+    @Test
+    public void testCountBasedWindow() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new CountEvictionPolicy<Integer>(5);
+        TriggerPolicy<Integer, ?> triggerPolicy = new CountTriggerPolicy<Integer>(2, windowManager, evictionPolicy);
+        triggerPolicy.start();
+        windowManager.setEvictionPolicy(evictionPolicy);
+        windowManager.setTriggerPolicy(triggerPolicy);
+        windowManager.add(new EventImpl<>(1, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(2, TIMESTAMP, null));
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 2), listener.onActivationEvents);
+        assertEquals(seq(1, 2), listener.onActivationNewEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        windowManager.add(new EventImpl<>(3, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(4, TIMESTAMP, null));
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 4), listener.onActivationEvents);
+        assertEquals(seq(3, 4), listener.onActivationNewEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        windowManager.add(new EventImpl<>(5, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(6, TIMESTAMP, null));
+        // 1 expired
+        assertEquals(seq(1), listener.onExpiryEvents);
+        assertEquals(seq(2, 6), listener.onActivationEvents);
+        assertEquals(seq(5, 6), listener.onActivationNewEvents);
+        assertEquals(seq(1), listener.onActivationExpiredEvents);
+        listener.clear();
+        windowManager.add(new EventImpl<>(7, TIMESTAMP, null));
+        // nothing expires until threshold is hit
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        windowManager.add(new EventImpl<>(8, TIMESTAMP, null));
+        // 1 expired
+        assertEquals(seq(2, 3), listener.onExpiryEvents);
+        assertEquals(seq(4, 8), listener.onActivationEvents);
+        assertEquals(seq(7, 8), listener.onActivationNewEvents);
+        assertEquals(seq(2, 3), listener.onActivationExpiredEvents);
+    }
+
+    @Test
+    public void testExpireThreshold() throws Exception {
+        int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        int windowLength = 5;
+        CountEvictionPolicy<Integer> countEvictionPolicy = new CountEvictionPolicy<Integer>(5);
+        windowManager.setEvictionPolicy(countEvictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy = new TimeTriggerPolicy<Integer>(Duration.ofHours(1)
+                .toMillis(), windowManager, countEvictionPolicy, null);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        for (Event<Integer> i : seq(1, 5)) {
+            windowManager.add(i);
+        }
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        for (Event<Integer> i : seq(6, 10)) {
+            windowManager.add(i);
+        }
+        for (Event<Integer> i : seq(11, threshold)) {
+            windowManager.add(i);
+        }
+        // window should be compacted and events should be expired.
+        assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents);
+    }
+
+    private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy
+                                                                            watermarkEvictionPolicy,
+                                                                    int windowLength) throws
+            Exception {
+        /**
+         * The watermark eviction policy must not evict tuples until the first watermark has been
+         * received.
+         * The policies can't make a meaningful decision prior to the first watermark, so the safe
+         * decision
+         * is to postpone eviction.
+         */
+        int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        windowManager.setEvictionPolicy(watermarkEvictionPolicy);
+        WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager,
+                watermarkEvictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        for (Event<Integer> i : seqThreshold(1, threshold)) {
+            windowManager.add(i);
+        }
+        assertTrue(listener.onExpiryEvents.isEmpty(), "The watermark eviction policies should never evict events " +
+                "before the first "
+                + "watermark is received");
+        windowManager.add(new WaterMarkEvent<>(threshold));
+        // The events should be put in a window when the first watermark is received
+        assertEquals(seqThreshold(1, threshold), listener.onActivationEvents);
+        //Now add some more events and a new watermark, and check that the previous events are expired
+        for (Event<Integer> i : seqThreshold(threshold + 1, threshold * 2)) {
+            windowManager.add(i);
+        }
+        windowManager.add(new WaterMarkEvent<>(threshold + windowLength + 1));
+        //All the events should be expired when the next watermark is received
+        assertEquals(listener
+                .onExpiryEvents, seqThreshold(1, threshold), "All the events should be expired after the second " +
+                "watermark");
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception {
+        int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength);
+        testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength);
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception {
+        int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
+        EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength);
+        testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength);
+    }
+
+    @Test
+    public void testTimeBasedWindow() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new TimeEvictionPolicy<Integer>(Duration
+                .ofSeconds(1).toMillis());
+        windowManager.setEvictionPolicy(evictionPolicy);
+        /*
+         * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
+         * Set it to a large value and trigger manually.
+          */
+        TriggerPolicy<Integer, ?> triggerPolicy = new TimeTriggerPolicy<Integer>(Duration.ofDays(1)
+                .toMillis(), windowManager, evictionPolicy, null);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        long now = System.currentTimeMillis();
+
+        // add with past ts
+        for (Event<Integer> i : seq(1, 50, now - 1000)) {
+            windowManager.add(i);
+        }
+
+        // add with current ts
+        for (Event<Integer> i : seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now)) {
+            windowManager.add(i);
+        }
+        // first 50 should have expired due to expire events threshold
+        assertEquals(50, listener.onExpiryEvents.size());
+
+        // add more events with past ts
+        for (Event<Integer> i : seq(
+                WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000)) {
+            windowManager.add(i);
+        }
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 100));
+        windowManager.onTrigger();
+
+        // 100 events with past ts should expire
+        assertEquals(100, listener.onExpiryEvents.size());
+        assertEquals(seq(
+                WindowManager.EXPIRE_EVENTS_THRESHOLD + 1,
+                WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000), listener.onExpiryEvents);
+        List<Event<Integer>> activationsEvents = seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now);
+        assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now), listener.onActivationEvents);
+        assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now), listener.onActivationNewEvents);
+        // activation expired list should contain even the ones expired due to EXPIRE_EVENTS_THRESHOLD
+        List<Event<Integer>> expiredList = seq(1, 50, now - 1000);
+        expiredList.addAll(seq(
+                WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000));
+        assertEquals(expiredList, listener.onActivationExpiredEvents);
+
+        listener.clear();
+        // add more events with current ts
+        List<Event<Integer>> newEvents = seq(
+                WindowManager.EXPIRE_EVENTS_THRESHOLD + 101, WindowManager.EXPIRE_EVENTS_THRESHOLD + 200, now);
+        for (Event<Integer> i : newEvents) {
+            windowManager.add(i);
+        }
+        activationsEvents.addAll(newEvents);
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 200));
+        windowManager.onTrigger();
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(activationsEvents, listener.onActivationEvents);
+        assertEquals(newEvents, listener.onActivationNewEvents);
+
+    }
+
+
+    @Test
+    public void testTimeBasedWindowExpiry() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy =
+                new TimeEvictionPolicy<Integer>(Duration.ofMillis(100).toMillis());
+        windowManager.setEvictionPolicy(evictionPolicy);
+        /*
+         * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
+         * Set it to a large value and trigger manually.
+          */
+        TriggerPolicy<Integer, ?> triggerPolicy = new TimeTriggerPolicy<Integer>(Duration.ofDays(1)
+                .toMillis(), windowManager, evictionPolicy, null);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        long now = TIMESTAMP;
+        // add 10 events
+        for (Event<Integer> i : seq(1, 10)) {
+            windowManager.add(i);
+        }
+        // simulate the time trigger by setting the reference time and invoking onTrigger() manually
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 60));
+        windowManager.onTrigger();
+
+        assertEquals(seq(1, 10), listener.onActivationEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        listener.clear();
+        // wait so all events expire
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 120));
+        windowManager.onTrigger();
+
+        assertEquals(seq(1, 10), listener.onExpiryEvents);
+        assertTrue(listener.onActivationEvents.isEmpty());
+        listener.clear();
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 180));
+        windowManager.onTrigger();
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        assertTrue(listener.onActivationEvents.isEmpty());
+
+    }
+
+    @Test
+    public void testTumblingWindow() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new CountEvictionPolicy<Integer>(3);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy = new CountTriggerPolicy<Integer>(3, windowManager, evictionPolicy);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+        windowManager.add(new EventImpl<>(1, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(2, TIMESTAMP, null));
+        // nothing expired yet
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        windowManager.add(new EventImpl<>(3, TIMESTAMP, null));
+        assertTrue(listener.onExpiryEvents.isEmpty());
+        assertEquals(seq(1, 3), listener.onActivationEvents);
+        assertTrue(listener.onActivationExpiredEvents.isEmpty());
+        assertEquals(seq(1, 3), listener.onActivationNewEvents);
+
+        listener.clear();
+        windowManager.add(new EventImpl<>(4, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(5, TIMESTAMP, null));
+        windowManager.add(new EventImpl<>(6, TIMESTAMP, null));
+
+        assertEquals(seq(1, 3), listener.onExpiryEvents);
+        assertEquals(seq(4, 6), listener.onActivationEvents);
+        assertEquals(seq(1, 3), listener.onActivationExpiredEvents);
+        assertEquals(seq(4, 6), listener.onActivationNewEvents);
+
+    }
+
+
+    @Test
+    public void testEventTimeBasedWindow() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy = new WatermarkTimeTriggerPolicy<Integer>(10,
+                windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 603, null));
+        windowManager.add(new EventImpl<>(2, 605, null));
+        windowManager.add(new EventImpl<>(3, 607, null));
+
+        // This should trigger the scan to find
+        // the next aligned window end ts, but not produce any activations
+        windowManager.add(new WaterMarkEvent<Integer>(609));
+        assertEquals(Collections.emptyList(), listener.allOnActivationEvents);
+
+        windowManager.add(new EventImpl<>(4, 618, null));
+        windowManager.add(new EventImpl<>(5, 626, null));
+        windowManager.add(new EventImpl<>(6, 636, null));
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null)
+        }), listener.allOnActivationEvents.get(2));
+
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(0));
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnActivationExpiredEvents.get(2));
+
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnActivationNewEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null)
+        }), listener.allOnActivationNewEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(5, 626, null)
+        }), listener.allOnActivationNewEvents.get(2));
+
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnExpiryEvents.get(0));
+
+        // add more events with a gap in ts
+        windowManager.add(new EventImpl<>(7, 825, null));
+        windowManager.add(new EventImpl<>(8, 826, null));
+        windowManager.add(new EventImpl<>(9, 827, null));
+        windowManager.add(new EventImpl<>(10, 839, null));
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(834));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(7, 825, null),
+                new EventImpl<>(8, 826, null),
+                new EventImpl<>(9, 827, null)
+        }), listener.allOnActivationEvents.get(2));
+
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null)
+        }), listener.allOnActivationExpiredEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(5, 626, null)
+        }), listener.allOnActivationExpiredEvents.get(1));
+        assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(2));
+
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationNewEvents.get(0));
+        assertEquals(Collections.emptyList(), listener.allOnActivationNewEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(7, 825, null),
+                new EventImpl<>(8, 826, null),
+                new EventImpl<>(9, 827, null)
+        }), listener.allOnActivationNewEvents.get(2));
+
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null)
+        }), listener.allOnExpiryEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(5, 626, null)
+        }), listener.allOnExpiryEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnExpiryEvents.get(2));
+    }
+
+    @Test
+    public void testCountBasedWindowWithEventTs() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkCountEvictionPolicy<>(3);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 603, null));
+        windowManager.add(new EventImpl<>(2, 605, null));
+        windowManager.add(new EventImpl<>(3, 607, null));
+        windowManager.add(new EventImpl<>(4, 618, null));
+        windowManager.add(new EventImpl<>(5, 626, null));
+        windowManager.add(new EventImpl<>(6, 636, null));
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null)
+        }), listener.allOnActivationEvents.get(2));
+
+        // add more events with a gap in ts
+        windowManager.add(new EventImpl<>(7, 665, null));
+        windowManager.add(new EventImpl<>(8, 666, null));
+        windowManager.add(new EventImpl<>(9, 667, null));
+        windowManager.add(new EventImpl<>(10, 679, null));
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(674));
+        assertEquals(4, listener.allOnActivationEvents.size());
+        // same set of events part of three windows
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 636, null)
+        }), listener.allOnActivationEvents.get(2));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(7, 665, null),
+                new EventImpl<>(8, 666, null),
+                new EventImpl<>(9, 667, null)
+        }), listener.allOnActivationEvents.get(3));
+    }
+
+    @Test
+    public void testCountBasedTriggerWithEventTs() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkTimeEvictionPolicy<Integer>(20);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkCountTriggerPolicy<Integer>(3, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 603, null));
+        windowManager.add(new EventImpl<>(2, 605, null));
+        windowManager.add(new EventImpl<>(3, 607, null));
+        windowManager.add(new EventImpl<>(4, 618, null));
+        windowManager.add(new EventImpl<>(5, 625, null));
+        windowManager.add(new EventImpl<>(6, 626, null));
+        windowManager.add(new EventImpl<>(7, 629, null));
+        windowManager.add(new EventImpl<>(8, 636, null));
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(2, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 625, null),
+                new EventImpl<>(6, 626, null)
+
+        }), listener.allOnActivationEvents.get(1));
+
+        // add more events with a gap in ts
+        windowManager.add(new EventImpl<>(9, 665, null));
+        windowManager.add(new EventImpl<>(10, 666, null));
+        windowManager.add(new EventImpl<>(11, 667, null));
+        windowManager.add(new EventImpl<>(12, 669, null));
+        windowManager.add(new EventImpl<>(12, 679, null));
+
+        listener.clear();
+        windowManager.add(new WaterMarkEvent<Integer>(674));
+        assertEquals(2, listener.allOnActivationEvents.size());
+        // same set of events part of three windows
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(9, 665, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(9, 665, null),
+                new EventImpl<>(10, 666, null),
+                new EventImpl<>(11, 667, null),
+                new EventImpl<>(12, 669, null),
+        }), listener.allOnActivationEvents.get(1));
+    }
+
+    @Test
+    public void testCountBasedTumblingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkCountEvictionPolicy<>(2);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 10, null));
+        windowManager.add(new EventImpl<>(2, 10, null));
+        windowManager.add(new EventImpl<>(3, 11, null));
+        windowManager.add(new EventImpl<>(4, 12, null));
+        windowManager.add(new EventImpl<>(5, 12, null));
+        windowManager.add(new EventImpl<>(6, 12, null));
+        windowManager.add(new EventImpl<>(7, 12, null));
+        windowManager.add(new EventImpl<>(8, 13, null));
+        windowManager.add(new EventImpl<>(9, 14, null));
+        windowManager.add(new EventImpl<>(10, 15, null));
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 10, null),
+                new EventImpl<>(2, 10, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(3, 11, null),
+                new EventImpl<>(4, 12, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(5, 12, null),
+                new EventImpl<>(6, 12, null)
+        }), listener.allOnActivationEvents.get(2));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(7, 12, null),
+                new EventImpl<>(8, 13, null)
+        }), listener.allOnActivationEvents.get(3));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(9, 14, null),
+                new EventImpl<>(10, 15, null)
+        }), listener.allOnActivationEvents.get(4));
+    }
+
+    @Test
+    public void testCountBasedSlidingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkCountEvictionPolicy<>(5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 10, null));
+        windowManager.add(new EventImpl<>(2, 10, null));
+        windowManager.add(new EventImpl<>(3, 11, null));
+        windowManager.add(new EventImpl<>(4, 12, null));
+        windowManager.add(new EventImpl<>(5, 12, null));
+        windowManager.add(new EventImpl<>(6, 12, null));
+        windowManager.add(new EventImpl<>(7, 12, null));
+        windowManager.add(new EventImpl<>(8, 13, null));
+        windowManager.add(new EventImpl<>(9, 14, null));
+        windowManager.add(new EventImpl<>(10, 15, null));
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 10, null),
+                new EventImpl<>(2, 10, null)
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 10, null),
+                new EventImpl<>(2, 10, null),
+                new EventImpl<>(3, 11, null),
+                new EventImpl<>(4, 12, null)
+        }), listener.allOnActivationEvents.get(1));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(2, 10, null),
+                new EventImpl<>(3, 11, null),
+                new EventImpl<>(4, 12, null),
+                new EventImpl<>(5, 12, null),
+                new EventImpl<>(6, 12, null)
+        }), listener.allOnActivationEvents.get(2));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 12, null),
+                new EventImpl<>(5, 12, null),
+                new EventImpl<>(6, 12, null),
+                new EventImpl<>(7, 12, null),
+                new EventImpl<>(8, 13, null)
+        }), listener.allOnActivationEvents.get(3));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(6, 12, null),
+                new EventImpl<>(7, 12, null),
+                new EventImpl<>(8, 13, null),
+                new EventImpl<>(9, 14, null),
+                new EventImpl<>(10, 15, null),
+        }), listener.allOnActivationEvents.get(4));
+    }
+
+    @Test
+    public void testEventTimeLag() throws Exception {
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 603, null));
+        windowManager.add(new EventImpl<>(2, 605, null));
+        windowManager.add(new EventImpl<>(3, 607, null));
+        windowManager.add(new EventImpl<>(4, 618, null));
+        windowManager.add(new EventImpl<>(5, 626, null));
+        windowManager.add(new EventImpl<>(6, 632, null));
+        windowManager.add(new EventImpl<>(7, 629, null));
+        windowManager.add(new EventImpl<>(8, 636, null));
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null),
+        }), listener.allOnActivationEvents.get(1));
+        // out of order events should be processed upto the lag
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(7, 629, null)
+        }), listener.allOnActivationEvents.get(2));
+    }
+
+    @Test
+    public void testScanStop() throws Exception {
+        final Set<Event<Integer>> eventsScanned = new HashSet<>();
+        EvictionPolicy<Integer, ?> evictionPolicy = new WatermarkTimeEvictionPolicy<Integer>(20, 5) {
+
+            @Override
+            public Action evict(Event<Integer> event) {
+                eventsScanned.add(event);
+                return super.evict(event);
+            }
+
+        };
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer, ?> triggerPolicy
+                = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(new EventImpl<>(1, 603, null));
+        windowManager.add(new EventImpl<>(2, 605, null));
+        windowManager.add(new EventImpl<>(3, 607, null));
+        windowManager.add(new EventImpl<>(4, 618, null));
+        windowManager.add(new EventImpl<>(5, 626, null));
+        windowManager.add(new EventImpl<>(6, 629, null));
+        windowManager.add(new EventImpl<>(7, 636, null));
+        windowManager.add(new EventImpl<>(8, 637, null));
+        windowManager.add(new EventImpl<>(9, 638, null));
+        windowManager.add(new EventImpl<>(10, 639, null));
+
+        // send a watermark event, which should trigger three windows.
+        windowManager.add(new WaterMarkEvent<Integer>(631));
+
+        assertEquals(3, listener.allOnActivationEvents.size());
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+        }), listener.allOnActivationEvents.get(0));
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null),
+        }), listener.allOnActivationEvents.get(1));
+
+        // out of order events should be processed upto the lag
+        assertEquals(Arrays.asList(new Event[]{
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 629, null)
+        }), listener.allOnActivationEvents.get(2));
+
+        // events 8, 9, 10 should not be scanned at all since TimeEvictionPolicy lag 5s should break
+        // the WindowManager scan loop early.
+        assertEquals(new HashSet<>(Arrays.asList(new Event[]{
+                new EventImpl<>(1, 603, null),
+                new EventImpl<>(2, 605, null),
+                new EventImpl<>(3, 607, null),
+                new EventImpl<>(4, 618, null),
+                new EventImpl<>(5, 626, null),
+                new EventImpl<>(6, 629, null),
+                new EventImpl<>(7, 636, null)
+        })), eventsScanned);
+    }
+
+    private List<Event<Integer>> seq(int start) {
+        return seq(start, start);
+    }
+
+    private List<Event<Integer>> seq(int start, int stop) {
+        return seq(start, stop, null);
+    }
+
+    private List<Event<Integer>> seq(int start, int stop, Long ts) {
+        long timestamp = TIMESTAMP;
+        if (ts != null) {
+            timestamp = ts;
+        }
+        List<Event<Integer>> ints = new ArrayList<>();
+        for (int i = start; i <= stop; i++) {
+            ints.add(new EventImpl<>(i, timestamp, null));
+        }
+        return ints;
+    }
+
+    private List<Event<Integer>> seqThreshold(int start, int stop) {
+        List<Event<Integer>> ints = new ArrayList<>();
+        for (int i = start; i <= stop; i++) {
+            ints.add(new EventImpl<>(i, i, null));
+        }
+        return ints;
+    }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 01e385219c..e97b692f9d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -25,7 +25,7 @@
 public class PublishFunction implements Function<String, Void> {
     @Override
     public Void process(String input, Context context) {
-        String publishTopic = context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
+        String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
         String output = String.format("%s!", input);
         context.publish(publishTopic, output, DefaultSerDe.class.getName());
         return null;
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index 6ff3dd8786..fb3ceb01c7 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -28,11 +28,11 @@
     @Override
     public Void process(String input, Context context) {
         String key = "config-key";
-        Optional<String> maybeValue = context.getUserConfigValue(key);
+        Optional<Object> maybeValue = context.getUserConfigValue(key);
         Logger LOG = context.getLogger();
 
         if (maybeValue.isPresent()) {
-            String value = maybeValue.get();
+            String value = (String) maybeValue.get();
             LOG.info("The config value is {}", value);
         } else {
             LOG.error("No value present for the key {}", key);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
new file mode 100644
index 0000000000..189b2ca103
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collection;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+
+@Slf4j
+public class WindowFunction implements Function <Collection<Integer>, Integer> {
+    @Override
+    public Integer apply(Collection<Integer> integers) {
+
+        int sum = integers.stream().reduce(new BinaryOperator<Integer>() {
+            @Override
+            public Integer apply(Integer integer, Integer integer2) {
+                return integer + integer2;
+            }
+        }).get();
+        return sum;
+    }
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 4f44f9b59b..07238eba99 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -51,7 +51,7 @@ message FunctionDetails {
     string className = 4;
     string logTopic = 5;
     ProcessingGuarantees processingGuarantees = 6;
-    map<string,string> userConfig = 7;
+    string userConfig = 7;
     Runtime runtime = 8;
     bool autoAck = 9;
     int32 parallelism = 10;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 68ac380954..1f34b8b7e9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -23,7 +23,6 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
@@ -150,9 +149,7 @@ public void start() throws Exception {
             functionDetailsBuilder.setAutoAck(false);
         }
         if (userConfig != null && !userConfig.isEmpty()) {
-            Type type = new TypeToken<Map<String, String>>(){}.getType();
-            Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type);
-            functionDetailsBuilder.putAllUserConfig(userConfigMap);
+            functionDetailsBuilder.setUserConfig(userConfig);
         }
 
         // Setup source
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 568c51fbaa..5b77eb277c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -133,10 +133,10 @@
         args.add(pulsarServiceUrl);
         args.add("--max_buffered_tuples");
         args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-        Map<String, String> userConfig = instanceConfig.getFunctionDetails().getUserConfigMap();
+        String userConfig = instanceConfig.getFunctionDetails().getUserConfig();
         if (userConfig != null && !userConfig.isEmpty()) {
             args.add("--user_config");
-            args.add(new Gson().toJson(userConfig));
+            args.add(userConfig);
         }
         args.add("--port");
         args.add(String.valueOf(instanceConfig.getPort()));
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 5baecfb714..5895179a8f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -74,11 +74,12 @@
 
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
-    private Map<String, String> userConfig = new HashMap<>();
+    private Map<String, Object> userConfig = new HashMap<>();
     private SubscriptionType subscriptionType;
     private Runtime runtime;
     private boolean autoAck;
     private int parallelism;
     private Resources resources;
     private String fqfn;
+    private WindowConfig windowConfig;
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfig.java
new file mode 100644
index 0000000000..0ea4c79f58
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/WindowConfig.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@Accessors(chain = true)
+@ToString
+public class WindowConfig {
+
+    public static final String WINDOW_CONFIG_KEY = "__WINDOWCONFIGS__";
+
+    private Integer windowLengthCount;
+
+    private Long windowLengthDurationMs;
+
+    private Integer slidingIntervalCount;
+
+    private Long slidingIntervalDurationMs;
+
+    private String lateDataTopic;
+
+    private Long maxLagMs;
+
+    private Long watermarkEmitIntervalMs;
+
+    private String timestampExtractorClassName;
+
+    private String actualWindowFunctionClassName;
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services