You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/30 09:00:45 UTC

[4/4] incubator-rocketmq git commit: [ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add messaging relay service standard ASF JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-17

[ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add messaging relay service standard
ASF JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-17


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/3735a3f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/3735a3f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/3735a3f4

Branch: refs/heads/spec
Commit: 3735a3f4db44849d3d0b12970b1f64b7d277d675
Parents: 53eda1a
Author: vintagewang <vi...@apache.org>
Authored: Fri Dec 30 17:00:19 2016 +0800
Committer: vintagewang <vi...@apache.org>
Committed: Fri Dec 30 17:00:19 2016 +0800

----------------------------------------------------------------------
 spec/code/pom.xml                               |  10 +-
 .../org/apache/openrelay/InvokeContext.java     |  21 +++
 .../java/org/apache/openrelay/KeyValue.java     |  36 +++++
 .../org/apache/openrelay/ServiceEndPoint.java   |  95 +++++++++++++
 .../openrelay/ServiceEndPointManager.java       |  35 +++++
 .../org/apache/openrelay/ServiceLifecycle.java  |  24 ++++
 .../apache/openrelay/ServiceLoadBalance.java    |  33 +++++
 .../org/apache/openrelay/ServiceProperties.java |  32 +++++
 .../internal/ServiceEndPointAdapter.java        | 132 +++++++++++++++++++
 .../org/apache/openrelay/observer/Observer.java |  39 ++++++
 10 files changed, 455 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/pom.xml
----------------------------------------------------------------------
diff --git a/spec/code/pom.xml b/spec/code/pom.xml
index 097431b..ec66577 100644
--- a/spec/code/pom.xml
+++ b/spec/code/pom.xml
@@ -13,6 +13,7 @@
         <module>messaging-user-level-api/java</module>
         <module>messaging-user-level-samples/java</module>
         <module>messaging-wire-level-api</module>
+        <module>relay-user-level-api/java</module>
     </modules>
 
     <build>
@@ -22,8 +23,8 @@
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>2.3.2</version>
                 <configuration>
-                    <source>1.6</source>
-                    <target>1.6</target>
+                    <source>1.7</source>
+                    <target>1.7</target>
                     <encoding>UTF-8</encoding>
                     <showDeprecation>true</showDeprecation>
                     <showWarnings>true</showWarnings>
@@ -101,6 +102,11 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>relay-user-level-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
                 <version>4.11</version>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java
new file mode 100644
index 0000000..84c987d
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/InvokeContext.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openrelay;
+
+public interface InvokeContext {
+    KeyValue properties();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java
new file mode 100644
index 0000000..b79023c
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/KeyValue.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay;
+
+public interface KeyValue {
+    KeyValue put(final String key, final int value);
+
+    KeyValue put(final String key, final long value);
+
+    KeyValue put(final String key, final double value);
+
+    KeyValue put(final String key, final String value);
+
+    int getInt(final String key);
+
+    long getLong(final String key);
+
+    double getDouble(final String key);
+
+    String getString(final String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java
new file mode 100644
index 0000000..f48f93e
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPoint.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openrelay;
+
+import java.util.Properties;
+import org.apache.openrelay.observer.Observer;
+
+public interface ServiceEndPoint extends ServiceLifecycle {
+    /**
+     * Register/re-register a service in a serviceEndPoint object
+     * if service has been registered in serviceEndPoint object, it will be failed when registering delicately
+     *
+     * @param service the service to publish in serviceEndPoint
+     */
+    void publish(Object service);
+
+    /**
+     * Like {@link #publish(Object)} but specifying {@code properties}
+     * that can be used to configure the service published
+     *
+     * @param service the service to publish in serviceEndPoint
+     * @param properties the service published properties
+     */
+
+    void publish(Object service, Properties properties);
+
+    /**
+     * Bind a service object to serviceEndPoint, which can directly call services provided by service object
+     *
+     * @param type service type to bind in serviceEndPoint
+     * @return service proxy object to bind
+     */
+    <T> T bind(Class<T> type);
+
+    /**
+     * Like {@link #bind(Class)} but specifying {@code properties} that can be used to configure the service band
+     *
+     * @param type service type to bind in serviceEndPoint
+     * @param properties the service bind properties
+     * @param <T> service proxy object to bind
+     * @return service proxy object to bind
+     */
+    <T> T bind(Class<T> type, Properties properties);
+
+    /**
+     * Like {@link #bind(Class, Properties)} but specifying {@code serviceLoadBalance} that can be used to select
+     * endPoint target
+     *
+     * @param type service type to bind in serviceConsumer
+     * @param properties the service band properties
+     * @param serviceLoadBalance select endPoint target algorithm
+     * @param <T> service proxy object to bind
+     * @return service proxy object to bind
+     */
+    <T> T bind(Class<T> type, Properties properties, ServiceLoadBalance serviceLoadBalance);
+
+    /**
+     * Register an observer in an serviceEndPoint object. Whenever serviceEndPoint object publish or bind an service
+     * object, it will be notified to the list of observer object registered before
+     *
+     * @param observer observer event object to an serviceEndPoint object
+     */
+    void addObserver(Observer observer);
+
+    /**
+     * Removes the given observer from the list of observer
+     * <p>
+     * If the given observer has not been previously registered (i.e. it was
+     * never added) then this method call is a no-op. If it had been previously
+     * added then it will be removed. If it had been added more than once, then
+     * only the first occurrence will be removed.
+     *
+     * @param observer The observer to remove
+     */
+    void deleteObserver(Observer observer);
+
+    /**
+     * @return
+     */
+    InvokeContext invokeContext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java
new file mode 100644
index 0000000..49e07ab
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceEndPointManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay;
+
+import org.apache.openrelay.internal.ServiceEndPointAdapter;
+
+public class ServiceEndPointManager {
+    public static ServiceEndPoint getServiceEndPoint(String url) throws Exception {
+        return getServiceEndPoint(url, null);
+    }
+
+    public static ServiceEndPoint getServiceEndPoint(String url, KeyValue properties) throws Exception {
+        return ServiceEndPointAdapter.createServiceEndPoint(url, properties);
+    }
+
+    public static KeyValue buildKeyValue() {
+        return null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java
new file mode 100644
index 0000000..d0c82af
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLifecycle.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay;
+
+public interface ServiceLifecycle {
+    void start();
+
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java
new file mode 100644
index 0000000..9c4bd17
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceLoadBalance.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay;
+
+import java.util.Set;
+
+public interface ServiceLoadBalance {
+    /**
+     * Select a collection of eligible providerServicePoint object from the the list of providerServicePoint provided
+     * According to different selection strategies to select providerServicePoint that satisfied with application needs,
+     * such as RoundRobin or Random etc.
+     *
+     * @param servicePropertiesList providerServicePoint to choose from.
+     * @return a collection of eligible providerServicePoint object
+     */
+    Set<ServiceProperties> select(Set<ServiceProperties> servicePropertiesList);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java
new file mode 100644
index 0000000..423abd8
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/ServiceProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay;
+
+public interface ServiceProperties {
+    String id();
+
+    void id(String id);
+
+    String relayAddress();
+
+    void relayAddress(String address);
+
+    String providerId();
+
+    void providerId(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java
new file mode 100644
index 0000000..06ce69a
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/internal/ServiceEndPointAdapter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay.internal;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.openrelay.KeyValue;
+import org.apache.openrelay.ServiceEndPoint;
+
+public class ServiceEndPointAdapter {
+    private static final String PROTOCOL_NAME = "protocol";
+    private static final String SPI_NAME = "spi";
+    private static final String URL_NAME = "urls";
+    private static final String URL = "url";
+    private static final String DEFAULT_SERVICE_END_POINT = "rocketmq";
+    private static final String DEFAULT_SERVICE_IMPL = "org.apache.rocketmq.openrelay.impl.ServiceEndPointStandardImpl";
+    private static final String URL_SEPARATOR = ":";
+    private static final String LIST_SEPARATOR = ",";
+    private static final String PARAM_SEPARATOR = "&";
+    private static final String KV_SEPARATOR = "=";
+    private static Map<String, String> serviceEndPointClassMap = new HashMap<>();
+
+    static {
+        serviceEndPointClassMap.put(DEFAULT_SERVICE_END_POINT, DEFAULT_SERVICE_IMPL);
+    }
+
+    private static Map<String, List<String>> parseURI(String uri) {
+        if (uri == null || uri.length() == 0) {
+            return new HashMap<>();
+        }
+
+        int spiIndex = 0;
+        int index = uri.indexOf(URL_SEPARATOR);
+        Map<String, List<String>> results = new HashMap<>();
+        String protocol = uri.substring(0, index);
+        List<String> protocolSet = new ArrayList<>();
+        protocolSet.add(protocol);
+        results.put(PROTOCOL_NAME, protocolSet);
+        if (index > 0) {
+            String spi;
+            spiIndex = uri.indexOf(URL_SEPARATOR, index + 1);
+            if (spiIndex > 0) {
+                spi = uri.substring(index + 1, spiIndex);
+            }
+            else {
+                spi = uri.substring(index + 1);
+            }
+            List<String> spiSet = new ArrayList<>();
+            spiSet.add(spi);
+            results.put(SPI_NAME, spiSet);
+        }
+        if (spiIndex > 0) {
+            String urlList = uri.substring(spiIndex + 1);
+            String[] list = urlList.split(LIST_SEPARATOR);
+            if (list.length > 0) {
+                results.put(URL_NAME, Arrays.asList(list));
+            }
+        }
+        return results;
+    }
+
+    private static ServiceEndPoint instantiateServiceEndPoint(String driver, KeyValue properties)
+        throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
+        InvocationTargetException, InstantiationException {
+        String serviceImpl = driver;
+        if (serviceImpl == null)
+            serviceImpl = DEFAULT_SERVICE_IMPL;
+        if (serviceEndPointClassMap.containsKey(driver))
+            serviceImpl = serviceEndPointClassMap.get(driver);
+        Class<?> serviceEndPointClass = Class.forName(serviceImpl);
+        if (serviceEndPointClass == null)
+            return null;
+
+        if (properties.getString(URL) != null) {
+            String[] propertySplits = ((String)properties.getString(URL)).split(PARAM_SEPARATOR);
+            if (propertySplits.length > 0) {
+                for (int index = 1; index < propertySplits.length; index++) {
+                    String[] kv = propertySplits[index].split(KV_SEPARATOR);
+                    properties.put(kv[0], kv[1]);
+                }
+            }
+        }
+        Class[] paramTypes = {Properties.class};
+        Constructor constructor = serviceEndPointClass.getConstructor(paramTypes);
+        assert constructor != null;
+        return (ServiceEndPoint)constructor.newInstance(properties);
+    }
+
+    private static ServiceEndPoint createServiceEndPoint(Map<String, List<String>> url, KeyValue properties)
+        throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
+        InstantiationException, IllegalAccessException {
+        List<String> driver = url.get(SPI_NAME);
+        List<String> urls = url.get(URL_NAME);
+        Collections.shuffle(urls);
+        Collections.shuffle(driver);
+        if (urls.size() > 0)
+            properties.put(URL, urls.get(0));
+        return ServiceEndPointAdapter.instantiateServiceEndPoint(driver.get(0), properties);
+    }
+
+    public static ServiceEndPoint createServiceEndPoint(String url, KeyValue properties)
+        throws ClassNotFoundException, NoSuchMethodException, InstantiationException,
+        IllegalAccessException, InvocationTargetException {
+        Map<String, List<String>> driverUrl = parseURI(url);
+        if (null == driverUrl || driverUrl.size() == 0) {
+            throw new IllegalArgumentException("driver url parsed result.size ==0");
+        }
+        return createServiceEndPoint(driverUrl, properties);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3735a3f4/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java
----------------------------------------------------------------------
diff --git a/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java
new file mode 100644
index 0000000..097d309
--- /dev/null
+++ b/spec/code/relay-user-level-api/java/src/main/java/org/apache/openrelay/observer/Observer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.openrelay.observer;
+
+import java.util.Observable;
+
+public interface Observer<T> {
+    /**
+     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
+     * <p>
+     * The {@link Observable} will not call this method if it calls {@link #onError}.
+     */
+    void onCompleted();
+
+    /**
+     * Notifies the Observer that the {@link Observable} has experienced an error condition.
+     * <p>
+     * If the {@link Observable} calls this method, it will not thereafter call
+     * {@link #onCompleted}.
+     *
+     * @param e the exception encountered by the Observable
+     */
+    void onError(Throwable e);
+}