You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 16:50:49 UTC

[02/17] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.

IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e

Branch: refs/heads/ignite-1093-2
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |  5 +-
 .../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
 .../stream/StreamSingleTupleExtractor.java      | 40 ++++++++++++
 .../ignite/stream/StreamTupleExtractor.java     | 23 +++----
 .../ignite/stream/socket/SocketStreamer.java    |  2 +-
 .../stream/socket/SocketStreamerSelfTest.java   |  6 +-
 6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
 import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
 import org.apache.ignite.examples.streaming.wordcount.QueryWords;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.socket.SocketMessageConverter;
 import org.apache.ignite.stream.socket.SocketStreamer;
 
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
             }
         });
 
-        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+        sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
             @Override public Map.Entry<AffinityUuid, String> extract(String word) {
                 // By using AffinityUuid we ensure that identical
                 // words are processed on the same cluster node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-    /** Tuple extractor. */
-    private StreamTupleExtractor<T, K, V> extractor;
+
+    /** Tuple extractor extracting a single tuple from an event */
+    private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
     /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
     private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
      * Stream adapter.
      *
      * @param stmr Streamer.
-     * @param extractor Tuple extractor.
+     * @param extractor Tuple extractor (1:1).
      */
-    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
         this.stmr = stmr;
-        this.extractor = extractor;
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor (1:n).
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.multipleTupleExtractor = extractor;
     }
 
     /**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
 
     /**
      * @return Provided tuple extractor.
+     * @see #getSingleTupleExtractor()
      */
+    @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        return extractor;
+        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+            return (StreamTupleExtractor) singleTupleExtractor;
+        }
+        throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+            "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
 
     /**
-     * @param extractor Extractor for key-value tuples from messages.
+     * @param extractor Extractor for a single key-value tuple from the message.
+     * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
      */
+    @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        this.extractor = extractor;
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * @return Provided single tuple extractor.
+     */
+    public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+        return singleTupleExtractor;
+    }
+
+    /**
+     * @param singleTupleExtractor Extractor for key-value tuples from messages.
+     */
+    public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = singleTupleExtractor;
     }
 
     /**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+        if (singleTupleExtractor != null) {
+            throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+        }
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
      * underlying streamer.
-     * <p>
-     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
-     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
         if (multipleTupleExtractor == null) {
-            Map.Entry<K, V> e = extractor.extract(msg);
+            Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
 
             if (e != null)
                 stmr.addData(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
 
 package org.apache.ignite.stream;
 
-import java.util.Map;
-
 /**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
  *
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
  */
-public interface StreamTupleExtractor<T, K, V> {
-    /**
-     * Extracts a key-value tuple from a message.
-     *
-     * @param msg Message.
-     * @return Key-value tuple.
-     */
-    public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+        A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
             "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
             sockStmr.setDelimiter(delim);
 
             if (oneMessagePerTuple) {
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+                sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
                     @Override public Map.Entry<Integer, String> extract(Message msg) {
                         return new IgniteBiTuple<>(msg.key, msg.val);
                     }