You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/07/31 01:03:54 UTC
[incubator-pulsar] branch master updated: Add Test Specific
Examples (#2253)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d0350fc Add Test Specific Examples (#2253)
d0350fc is described below
commit d0350fc2cbec3e103cbec262a0ea61d90bf0d062
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Mon Jul 30 18:03:52 2018 -0700
Add Test Specific Examples (#2253)
These are function examples specifically for test scenarios.
---
.../functions/api/examples/ByteBufferSerDe.java | 39 ++++++++++++++++++++
.../api/examples/CommaWindowFunction.java | 31 ++++++++++++++++
.../functions/api/examples/CustomBaseObject.java | 32 ++++++++++++++++
.../functions/api/examples/CustomBaseSerde.java | 42 +++++++++++++++++++++
.../api/examples/CustomBaseToBaseFunction.java | 34 +++++++++++++++++
.../api/examples/CustomBaseToDerivedFunction.java | 34 +++++++++++++++++
.../api/examples/CustomDerivedObject.java | 34 +++++++++++++++++
.../functions/api/examples/CustomDerivedSerde.java | 43 ++++++++++++++++++++++
.../api/examples/CustomDerivedToBaseFunction.java | 34 +++++++++++++++++
.../examples/CustomDerivedToDerivedFunction.java | 34 +++++++++++++++++
.../functions/api/examples/UserConfigFunction.java | 41 +++++++++++++++++++++
.../api/examples/UserExceptionFunction.java | 34 +++++++++++++++++
.../api/examples/UserPublishFunction.java | 41 +++++++++++++++++++++
.../api/examples/WindowDurationFunction.java | 32 ++++++++++++++++
14 files changed, 505 insertions(+)
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
new file mode 100644
index 0000000..f433372
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.SerDe;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Simple ByteBuffer Serializer and Deserializer
+ */
+public class ByteBufferSerDe implements SerDe<Integer> {
+ @Override
+ public Integer deserialize(byte[] bytes) {
+ return ByteBuffer.wrap(bytes).getInt();
+ }
+
+ @Override
+ public byte[] serialize(Integer integer) {
+ return ByteBuffer.allocate(4).putInt(integer).array();
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
new file mode 100644
index 0000000..da696fd
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.Collection;
+
+/**
+ * Comma based aggregation window function example
+ */
+public class CommaWindowFunction implements java.util.function.Function<Collection<String>, String> {
+ @Override
+ public String apply(Collection<String> integers) {
+ return String.join(",", integers);
+ }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java
new file mode 100644
index 0000000..173393b
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class CustomBaseObject {
+ private long baseValue;
+}
+
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
new file mode 100644
index 0000000..b8a6a57
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.SerDe;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Example of using a byte buffer serialization for Custom object
+ */
+public class CustomBaseSerde implements SerDe<CustomBaseObject> {
+ @Override
+ public CustomBaseObject deserialize(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ return new CustomBaseObject(buffer.getLong());
+ }
+
+ @Override
+ public byte[] serialize(CustomBaseObject object) {
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putLong(object.getBaseValue());
+ return buffer.array();
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
new file mode 100644
index 0000000..d3b9225
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Function example of processing on a custom object type
+ */
+public class CustomBaseToBaseFunction implements Function<CustomBaseObject, CustomBaseObject> {
+
+ @Override
+ public CustomBaseObject process(CustomBaseObject input, Context context) {
+ return new CustomBaseObject(input.getBaseValue() + 100);
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
new file mode 100644
index 0000000..9e8cab5
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Examplf of fucntion doing a type object conversion between input ann output type
+ */
+public class CustomBaseToDerivedFunction implements Function<CustomBaseObject, CustomDerivedObject> {
+
+ @Override
+ public CustomDerivedObject process(CustomBaseObject input, Context context) {
+ return new CustomDerivedObject(input.getBaseValue() + 100, (int)input.getBaseValue() + 50);
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java
new file mode 100644
index 0000000..464e977
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class CustomDerivedObject extends CustomBaseObject {
+ private int derivedValue;
+ public CustomDerivedObject(long baseValue, int derivedValue) {
+ super(baseValue);
+ this.derivedValue = derivedValue;
+ }
+}
+
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
new file mode 100644
index 0000000..219cdad
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.SerDe;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Example to derived object serialization
+ */
+public class CustomDerivedSerde implements SerDe<CustomDerivedObject> {
+ @Override
+ public CustomDerivedObject deserialize(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ return new CustomDerivedObject(buffer.getLong(), buffer.getInt());
+ }
+
+ @Override
+ public byte[] serialize(CustomDerivedObject object) {
+ ByteBuffer buffer = ByteBuffer.allocate(12);
+ buffer.putLong(object.getBaseValue());
+ buffer.putInt(object.getDerivedValue());
+ return buffer.array();
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java
new file mode 100644
index 0000000..8bcb0d0
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Example of function converting a derived object to a base object.
+ */
+public class CustomDerivedToBaseFunction implements Function<CustomDerivedObject, CustomBaseObject> {
+
+ @Override
+ public CustomBaseObject process(CustomDerivedObject input, Context context) {
+ return new CustomBaseObject(input.getBaseValue() + 101);
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
new file mode 100644
index 0000000..a86c82d
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Example of 2nd order conversion from a base object for composition pipelines
+ */
+public class CustomDerivedToDerivedFunction implements Function<CustomDerivedObject, CustomDerivedObject> {
+
+ @Override
+ public CustomDerivedObject process(CustomDerivedObject input, Context context) {
+ return new CustomDerivedObject(input.getBaseValue() + 101, input.getDerivedValue() + 150);
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
new file mode 100644
index 0000000..add14d4
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+import java.util.Optional;
+
+/**
+ * An example demonstrate retrieving user config value from Context
+ */
+public class UserConfigFunction implements Function<String, String> {
+
+ @Override
+ public String process(String input, Context context) {
+ Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
+ if (whatToWrite.get() != null) {
+ return (String)whatToWrite.get();
+ } else {
+ return "Not a nice way";
+ }
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java
new file mode 100644
index 0000000..2979d22
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+
+/**
+ * This Function simulates a pulsar function encountering runtime errors.
+ */
+public class UserExceptionFunction implements Function<String, String> {
+ @Override
+ public String process(String input, Context context) {
+ throw new RuntimeException("This wont work");
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
new file mode 100644
index 0000000..1d31c47
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+
+import java.util.Optional;
+
+/**
+ * An example demonstrate publishing messages through Context
+ */
+public class UserPublishFunction implements Function<String, Void> {
+
+ @Override
+ public Void process(String input, Context context) {
+ Optional<Object> topicToWrite = context.getUserConfigValue("topic");
+ if (topicToWrite.get() != null) {
+ context.publish((String)topicToWrite.get(), input, DefaultSerDe.class.getName());
+ }
+ return null;
+ }
+}
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
new file mode 100644
index 0000000..cb0aa9f
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.Collection;
+
+/**
+ * This functions collects the timestamp during the window operation
+ */
+public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
+ @Override
+ public String apply(Collection<String> integers) {
+ long time = System.currentTimeMillis();
+ return String.format("%s:%s", String.join(",", integers), time);
+ }
+}