You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/23 17:45:17 UTC
[02/30] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor
in favor of new Stream*Single*TupleExtractor.
IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e
Branch: refs/heads/ignite-843
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
.../stream/StreamSingleTupleExtractor.java | 40 ++++++++++++
.../ignite/stream/StreamTupleExtractor.java | 23 +++----
.../ignite/stream/socket/SocketStreamer.java | 2 +-
.../stream/socket/SocketStreamerSelfTest.java | 6 +-
6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Map;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
import org.apache.ignite.examples.streaming.wordcount.QueryWords;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.socket.SocketMessageConverter;
import org.apache.ignite.stream.socket.SocketStreamer;
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
}
});
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
@Override public Map.Entry<AffinityUuid, String> extract(String word) {
// By using AffinityUuid we ensure that identical
// words are processed on the same cluster node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
* </ol>
*/
public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Tuple extractor extracting a single tuple from an event */
+ private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
/** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
* Stream adapter.
*
* @param stmr Streamer.
- * @param extractor Tuple extractor.
+ * @param extractor Tuple extractor (1:1).
*/
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
this.stmr = stmr;
- this.extractor = extractor;
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor (1:n).
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.multipleTupleExtractor = extractor;
}
/**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
/**
* @return Provided tuple extractor.
+ * @see #getSingleTupleExtractor()
*/
+ @Deprecated
public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
+ if (singleTupleExtractor instanceof StreamTupleExtractor) {
+ return (StreamTupleExtractor) singleTupleExtractor;
+ }
+ throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+ "StreamTupleExtractor; use getSingleTupleExtractor instead");
}
/**
- * @param extractor Extractor for key-value tuples from messages.
+ * @param extractor Extractor for a single key-value tuple from the message.
+ * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
*/
+ @Deprecated
public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * @return Provided single tuple extractor.
+ */
+ public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+ return singleTupleExtractor;
+ }
+
+ /**
+ * @param singleTupleExtractor Extractor for key-value tuples from messages.
+ */
+ public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = singleTupleExtractor;
}
/**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+ if (singleTupleExtractor != null) {
+ throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+ }
this.multipleTupleExtractor = multipleTupleExtractor;
}
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
/**
* Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
* underlying streamer.
- * <p>
- * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
- * and the latter will be ignored.
*
* @param msg Message to convert.
*/
protected void addMessage(T msg) {
if (multipleTupleExtractor == null) {
- Map.Entry<K, V> e = extractor.extract(msg);
+ Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
if (e != null)
stmr.addData(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
package org.apache.ignite.stream;
-import java.util.Map;
-
/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
*
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
*/
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* @throws IgniteException If failed.
*/
public void start() {
- A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+ A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
"tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr.setDelimiter(delim);
if (oneMessagePerTuple) {
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(Message msg) {
return new IgniteBiTuple<>(msg.key, msg.val);
}