You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/09/21 17:34:02 UTC

[1/4] ignite git commit: IGNITE-1370 Streamers: Implement multiple tuple extractor.

Repository: ignite
Updated Branches:
  refs/heads/master eae4df1e5 -> 421a5234b


IGNITE-1370 Streamers: Implement multiple tuple extractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d9734a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d9734a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d9734a0

Branch: refs/heads/master
Commit: 4d9734a0842f0a46310c1f6261fdf42371db8705
Parents: b736c46
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Sep 3 23:31:08 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Sun Sep 13 01:20:24 2015 +0100

----------------------------------------------------------------------
 .../org/apache/ignite/stream/StreamAdapter.java |  48 ++++++++-
 .../stream/StreamMultipleTupleExtractor.java    |  38 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |   5 +
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 104 +++++++++++++++----
 5 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 97edcbb..ffa0821 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.stream;
 
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 
@@ -26,11 +27,22 @@ import org.apache.ignite.IgniteDataStreamer;
  * streaming from different data sources. The purpose of adapters is to
  * convert different message formats into Ignite stream key-value tuples
  * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ * <p>
+ * Two types of tuple extractors are supported:
+ * <ol>
+ *     <li>A single tuple extractor, which extracts either no or 1 tuple out of a message. See
+ *     see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
+ *     <li>A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the
+ *     form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
+ * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
     /** Tuple extractor. */
     private StreamTupleExtractor<T, K, V> extractor;
 
+    /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
+    private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
+
     /** Streamer. */
     private IgniteDataStreamer<K, V> stmr;
 
@@ -84,6 +96,20 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
+     * @return Provided tuple extractor (for 1:n cardinality).
+     */
+    public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
+        return multipleTupleExtractor;
+    }
+
+    /**
+     * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
+     */
+    public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+        this.multipleTupleExtractor = multipleTupleExtractor;
+    }
+
+    /**
      * @return Provided {@link Ignite} instance.
      */
     public Ignite getIgnite() {
@@ -98,14 +124,28 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
-     * Converts given message to a tuple and adds it to the underlying streamer.
+     * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
+     * underlying streamer.
+     * <p>
+     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
+     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
-        Map.Entry<K, V> e = extractor.extract(msg);
+        if (multipleTupleExtractor == null) {
+            Map.Entry<K, V> e = extractor.extract(msg);
+
+            if (e != null)
+                stmr.addData(e);
 
-        if (e != null)
-            stmr.addData(e);
+        } else {
+            Map<K, V> m = multipleTupleExtractor.extract(msg);
+
+            if (m != null)
+                stmr.addData(m);
+
+        }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
new file mode 100644
index 0000000..71ad45a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a single message to zero, one or many tuples.
+ * <p>
+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
+ *
+ * @see StreamTupleExtractor
+ */
+public interface StreamMultipleTupleExtractor<T, K, V> {
+
+    /**
+     * Extracts a set of key-values from a message.
+     *
+     * @param msg Message.
+     * @return Map containing resulting tuples.
+     */
+    public Map<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index b6150ab..aed7d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -21,6 +21,11 @@ import java.util.Map;
 
 /**
  * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
+ * produce more than one tuple.
+ *
+ * @see StreamMultipleTupleExtractor
  */
 public interface StreamTupleExtractor<T, K, V> {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0d27af9..c89952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+            "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");
         A.ensure(threads > 0, "threads > 0");

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 185599d..8b05754 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -43,6 +44,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -111,7 +113,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg.length >>> 24);
                         os.write(msg.length >>> 16);
@@ -125,21 +127,52 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleEntriesFromOneMessage() throws Exception {
+        test(null, null, new Runnable() {
+            @Override public void run() {
+                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    int[] values = new int[CNT];
+                    for (int i = 0; i < CNT; i++) {
+                        values[i] = i;
+                    }
+
+                    byte[] msg = marsh.marshal(new Message(values));
+
+                    os.write(msg.length >>> 24);
+                    os.write(msg.length >>> 16);
+                    os.write(msg.length >>> 8);
+                    os.write(msg.length);
+
+                    os.write(msg);
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testSizeBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -164,7 +197,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
@@ -178,7 +211,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg);
                         os.write(DELIM);
@@ -188,7 +221,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
 
     }
 
@@ -196,14 +229,14 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDelimiterBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -225,16 +258,17 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
      * @param converter Converter.
      * @param r Runnable..
      */
-    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+    private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
+        boolean oneMessagePerTuple) throws Exception
     {
-        SocketStreamer<Tuple, Integer, String> sockStmr = null;
+        SocketStreamer<Message, Integer, String> sockStmr = null;
 
         Ignite ignite = grid(0);
 
@@ -257,11 +291,24 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
 
             sockStmr.setDelimiter(delim);
 
-            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
-                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
-                    return new IgniteBiTuple<>(msg.key, msg.val);
-                }
-            });
+            if (oneMessagePerTuple) {
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+                    @Override public Map.Entry<Integer, String> extract(Message msg) {
+                        return new IgniteBiTuple<>(msg.key, msg.val);
+                    }
+                });
+            }
+            else {
+                sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message, Integer, String>() {
+                    @Override public Map<Integer, String> extract(Message msg) {
+                        Map<Integer, String> answer = new HashMap<>();
+                        for (int value : msg.values) {
+                            answer.put(value, Integer.toString(value));
+                        }
+                        return answer;
+                    }
+                });
+            }
 
             if (converter != null)
                 sockStmr.setConverter(converter);
@@ -297,9 +344,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Tuple.
+     * Message.
      */
-    private static class Tuple implements Serializable {
+    private static class Message implements Serializable {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
@@ -309,12 +356,25 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
         /** Value. */
         private final String val;
 
+        /** Multiple values. */
+        private final int[] values;
+
         /**
          * @param key Key.
          */
-        Tuple(int key) {
+        Message(int key) {
             this.key = key;
             this.val = Integer.toString(key);
+            this.values = new int[0];
+        }
+
+        /**
+         * @param values Multiple values.
+         */
+        Message(int[] values) {
+            this.key = -1;
+            this.val = null;
+            this.values = values;
         }
     }
 }
\ No newline at end of file


[4/4] ignite git commit: IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.

Posted by ra...@apache.org.
IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/421a5234
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/421a5234
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/421a5234

Branch: refs/heads/master
Commit: 421a5234b5a7e56e36952a4c1976b3118310073e
Parents: eae4df1 b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:28:12 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:28:12 2015 +0100

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |   5 +-
 .../org/apache/ignite/stream/StreamAdapter.java | 104 +++++++++++++++--
 .../stream/StreamMultipleTupleExtractor.java    |  38 +++++++
 .../stream/StreamSingleTupleExtractor.java      |  40 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |  20 ++--
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 112 ++++++++++++++-----
 7 files changed, 270 insertions(+), 52 deletions(-)
----------------------------------------------------------------------



[3/4] ignite git commit: Merge branch 'master' into feature/ignite-1370.

Posted by ra...@apache.org.
Merge branch 'master' into feature/ignite-1370.

# Conflicts:
#	modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b80b1719
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b80b1719
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b80b1719

Branch: refs/heads/master
Commit: b80b17196269a036da65d781eb772a317039b014
Parents: b1dee96 6e48c9c
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 18:28:09 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 18:28:09 2015 +0100

----------------------------------------------------------------------
 RELEASE_NOTES.txt                               |    2 +-
 examples/config/example-default.xml             |   76 -
 examples/config/example-ignite.xml              |   56 +-
 examples/config/filesystem/README.txt           |    2 +-
 examples/config/filesystem/example-igfs.xml     |    7 -
 .../config/portable/example-ignite-portable.xml |   44 -
 .../ignite/examples/portable/Address.java       |   72 -
 .../ignite/examples/portable/Employee.java      |   93 -
 .../ignite/examples/portable/EmployeeKey.java   |   90 -
 .../portable/ExamplePortableNodeStartup.java    |   36 -
 .../ignite/examples/portable/Organization.java  |   93 -
 .../examples/portable/OrganizationType.java     |   32 -
 ...mputeClientPortableTaskExecutionExample.java |  154 -
 .../portable/computegrid/ComputeClientTask.java |  116 -
 .../portable/computegrid/package-info.java      |   21 -
 .../CacheClientPortablePutGetExample.java       |  230 --
 .../CacheClientPortableQueryExample.java        |  325 --
 .../portable/datagrid/package-info.java         |   21 -
 .../ignite/examples/portable/package-info.java  |   21 -
 .../CacheClientPortableExampleTest.java         |   46 -
 .../ComputeClientPortableExampleTest.java       |   37 -
 .../testsuites/IgniteExamplesSelfTestSuite.java |    6 -
 modules/clients/pom.xml                         |    7 +
 modules/clients/src/test/config/jdbc-config.xml |   55 +
 .../jdbc2/JdbcComplexQuerySelfTest.java         |  316 ++
 .../internal/jdbc2/JdbcConnectionSelfTest.java  |  268 ++
 .../internal/jdbc2/JdbcEmptyCacheSelfTest.java  |  140 +
 .../internal/jdbc2/JdbcLocalCachesSelfTest.java |  156 +
 .../internal/jdbc2/JdbcMetadataSelfTest.java    |  334 ++
 .../jdbc2/JdbcPreparedStatementSelfTest.java    |  730 ++++
 .../internal/jdbc2/JdbcResultSetSelfTest.java   |  751 ++++
 .../internal/jdbc2/JdbcStatementSelfTest.java   |  292 ++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   11 +
 modules/core/pom.xml                            |   21 -
 .../src/main/java/org/apache/ignite/Ignite.java |    7 -
 .../java/org/apache/ignite/IgniteCache.java     |   44 +-
 .../org/apache/ignite/IgniteJdbcDriver.java     |  281 +-
 .../java/org/apache/ignite/IgnitePortables.java |  370 --
 .../apache/ignite/IgniteSystemProperties.java   |    5 +-
 .../configuration/CacheConfiguration.java       |   70 +-
 .../ignite/internal/GridKernalContext.java      |    7 +-
 .../ignite/internal/GridKernalContextImpl.java  |   10 +-
 .../apache/ignite/internal/GridLoggerProxy.java |    6 +-
 .../org/apache/ignite/internal/IgniteEx.java    |    9 +
 .../apache/ignite/internal/IgniteKernal.java    |   14 +-
 .../ignite/internal/IgniteNodeAttributes.java   |    5 +-
 .../internal/executor/GridExecutorService.java  |    4 +-
 .../ignite/internal/jdbc/JdbcConnection.java    |    4 +
 .../internal/jdbc/JdbcConnectionInfo.java       |   91 -
 .../internal/jdbc/JdbcDatabaseMetadata.java     |    4 +
 .../internal/jdbc/JdbcPreparedStatement.java    |    4 +
 .../ignite/internal/jdbc/JdbcResultSet.java     |    4 +
 .../internal/jdbc/JdbcResultSetMetadata.java    |    4 +
 .../ignite/internal/jdbc/JdbcStatement.java     |    4 +
 .../apache/ignite/internal/jdbc/JdbcUtils.java  |    4 +
 .../ignite/internal/jdbc2/JdbcConnection.java   |  777 ++++
 .../internal/jdbc2/JdbcDatabaseMetadata.java    | 1401 +++++++
 .../internal/jdbc2/JdbcPreparedStatement.java   |  411 ++
 .../ignite/internal/jdbc2/JdbcQueryTask.java    |  361 ++
 .../ignite/internal/jdbc2/JdbcResultSet.java    | 1520 +++++++
 .../internal/jdbc2/JdbcResultSetMetadata.java   |  171 +
 .../ignite/internal/jdbc2/JdbcStatement.java    |  456 +++
 .../apache/ignite/internal/jdbc2/JdbcUtils.java |  155 +
 .../deployment/GridDeploymentStoreAdapter.java  |    4 +-
 .../discovery/GridDiscoveryManager.java         |   10 -
 .../portable/GridPortableMarshaller.java        |    2 +-
 .../portable/PortableClassDescriptor.java       |   10 +-
 .../internal/portable/PortableContext.java      |   32 +-
 .../portable/PortableMetaDataCollector.java     |    6 +-
 .../portable/PortableMetaDataHandler.java       |    4 +-
 .../internal/portable/PortableMetaDataImpl.java |   14 +-
 .../internal/portable/PortableObjectEx.java     |    6 +-
 .../internal/portable/PortableObjectImpl.java   |    6 +-
 .../portable/PortableObjectOffheapImpl.java     |    6 +-
 .../internal/portable/PortableRawReaderEx.java  |    4 +-
 .../internal/portable/PortableRawWriterEx.java  |    4 +-
 .../portable/PortableReaderContext.java         |    2 +-
 .../internal/portable/PortableReaderExImpl.java |   10 +-
 .../ignite/internal/portable/PortableUtils.java |    2 +-
 .../internal/portable/PortableWriterExImpl.java |   11 +-
 .../internal/portable/api/IgnitePortables.java  |  362 ++
 .../internal/portable/api/PortableBuilder.java  |  136 +
 .../portable/api/PortableException.java         |   57 +
 .../internal/portable/api/PortableIdMapper.java |   54 +
 .../api/PortableInvalidClassException.java      |   58 +
 .../portable/api/PortableMarshalAware.java      |   48 +
 .../portable/api/PortableMarshaller.java        |  358 ++
 .../internal/portable/api/PortableMetadata.java |   60 +
 .../internal/portable/api/PortableObject.java   |  152 +
 .../portable/api/PortableProtocolVersion.java   |   41 +
 .../portable/api/PortableRawReader.java         |  234 ++
 .../portable/api/PortableRawWriter.java         |  219 +
 .../internal/portable/api/PortableReader.java   |  284 ++
 .../portable/api/PortableSerializer.java        |   47 +
 .../portable/api/PortableTypeConfiguration.java |  195 +
 .../internal/portable/api/PortableWriter.java   |  266 ++
 .../portable/builder/PortableBuilderEnum.java   |    2 +-
 .../portable/builder/PortableBuilderImpl.java   |   14 +-
 .../portable/builder/PortableBuilderReader.java |    2 +-
 .../builder/PortableBuilderSerializer.java      |    2 +-
 .../builder/PortableEnumArrayLazyValue.java     |    4 +-
 .../builder/PortableObjectArrayLazyValue.java   |    2 +-
 .../builder/PortablePlainPortableObject.java    |    2 +-
 .../streams/PortableAbstractInputStream.java    |    2 +-
 .../processors/cache/GridCacheAdapter.java      |   31 +-
 .../cache/GridCacheClearAllRunnable.java        |    4 +-
 .../processors/cache/GridCacheContext.java      |    4 +-
 .../processors/cache/GridCacheIoManager.java    |    4 +-
 .../processors/cache/GridCacheLogger.java       |    4 +-
 .../processors/cache/GridCacheMapEntry.java     |   15 +-
 .../processors/cache/GridCacheMvcc.java         |    5 +-
 .../processors/cache/GridCacheProcessor.java    |   37 +-
 .../cache/GridCacheSharedContext.java           |    4 +-
 .../processors/cache/GridCacheSwapManager.java  |   46 +-
 .../processors/cache/IgniteCacheProxy.java      |    5 -
 .../distributed/GridCacheTxRecoveryFuture.java  |   11 +-
 .../distributed/GridDistributedCacheEntry.java  |    6 +-
 .../GridDistributedTxFinishRequest.java         |   13 +-
 .../GridDistributedTxRemoteAdapter.java         |   18 +-
 .../dht/GridClientPartitionTopology.java        |  104 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    1 +
 .../dht/GridDhtPartitionTopology.java           |    4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   16 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  514 ++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   15 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |   84 +-
 .../dht/GridDhtTxFinishResponse.java            |   89 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |    4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   67 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   34 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   40 +-
 .../dht/GridPartitionedGetFuture.java           |    4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |    4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   11 +-
 .../distributed/near/GridNearLockFuture.java    |   11 +-
 .../distributed/near/GridNearLockRequest.java   |   18 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   55 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   11 +-
 .../near/GridNearTxFinishFuture.java            |  345 +-
 .../near/GridNearTxFinishRequest.java           |   20 +-
 .../cache/distributed/near/GridNearTxLocal.java |   64 +-
 .../distributed/near/GridNearTxRemote.java      |   38 +-
 .../CacheDefaultPortableAffinityKeyMapper.java  |    2 +-
 .../portable/CacheObjectPortableContext.java    |    2 +-
 .../portable/CacheObjectPortableProcessor.java  |    8 +-
 .../CacheObjectPortableProcessorImpl.java       |   12 +-
 .../cache/portable/IgnitePortablesImpl.java     |   10 +-
 .../cache/store/CacheOsStoreManager.java        |    4 +-
 .../cache/transactions/IgniteTxAdapter.java     |    5 +-
 .../cache/transactions/IgniteTxHandler.java     |  286 +-
 .../transactions/IgniteTxLocalAdapter.java      |   37 +-
 .../cache/transactions/IgniteTxManager.java     |   48 +-
 .../continuous/GridContinuousProcessor.java     |   22 +-
 .../datastructures/DataStructuresProcessor.java |  102 +-
 .../datastructures/GridCacheAtomicLongImpl.java |    4 +-
 .../GridCacheAtomicReferenceImpl.java           |    4 +-
 .../GridCacheAtomicSequenceImpl.java            |    4 +-
 .../GridCacheAtomicStampedImpl.java             |    4 +-
 .../GridCacheCountDownLatchImpl.java            |    4 +-
 .../GridTransactionalCacheQueueImpl.java        |   15 +-
 .../processors/igfs/IgfsFileAffinityRange.java  |    4 +-
 .../processors/igfs/IgfsFileWorkerBatch.java    |   15 +-
 .../igfs/IgfsFragmentizerManager.java           |    8 +-
 .../processors/igfs/IgfsServerManager.java      |    5 +-
 .../internal/processors/job/GridJobWorker.java  |    4 +-
 .../dotnet/PlatformDotNetConfiguration.java     |   12 +-
 .../PlatformDotNetPortableConfiguration.java    |   12 +-
 ...PlatformDotNetPortableTypeConfiguration.java |   12 +-
 .../processors/task/GridTaskWorker.java         |    4 +-
 .../util/GridSpiCloseableIteratorWrapper.java   |    5 +
 .../ignite/internal/util/IgniteUtils.java       |   16 +
 .../util/offheap/GridOffHeapEvictListener.java  |    2 +-
 .../util/offheap/GridOffHeapPartitionedMap.java |    1 +
 .../util/offheap/unsafe/GridUnsafeMap.java      |    4 +-
 .../marshaller/portable/PortableMarshaller.java |  358 --
 .../marshaller/portable/package-info.java       |   22 -
 .../apache/ignite/portable/PortableBuilder.java |  137 -
 .../ignite/portable/PortableException.java      |   57 -
 .../ignite/portable/PortableIdMapper.java       |   56 -
 .../portable/PortableInvalidClassException.java |   58 -
 .../ignite/portable/PortableMarshalAware.java   |   48 -
 .../ignite/portable/PortableMetadata.java       |   61 -
 .../apache/ignite/portable/PortableObject.java  |  154 -
 .../portable/PortableProtocolVersion.java       |   41 -
 .../ignite/portable/PortableRawReader.java      |  234 --
 .../ignite/portable/PortableRawWriter.java      |  219 -
 .../apache/ignite/portable/PortableReader.java  |  284 --
 .../ignite/portable/PortableSerializer.java     |   49 -
 .../portable/PortableTypeConfiguration.java     |  196 -
 .../apache/ignite/portable/PortableWriter.java  |  266 --
 .../apache/ignite/portable/package-info.java    |   22 -
 .../resources/META-INF/classnames.properties    |   20 +-
 .../GridDiscoveryManagerAttributesSelfTest.java |   45 -
 .../GridPortableAffinityKeySelfTest.java        |  218 -
 .../GridPortableBuilderAdditionalSelfTest.java  | 1226 ------
 .../portable/GridPortableBuilderSelfTest.java   | 1021 -----
 ...eBuilderStringAsCharsAdditionalSelfTest.java |   28 -
 ...ridPortableBuilderStringAsCharsSelfTest.java |   28 -
 ...idPortableMarshallerCtxDisabledSelfTest.java |  256 --
 .../GridPortableMarshallerSelfTest.java         | 3807 ------------------
 .../GridPortableMetaDataDisabledSelfTest.java   |  238 --
 .../portable/GridPortableMetaDataSelfTest.java  |  369 --
 .../portable/GridPortableWildcardsSelfTest.java |  482 ---
 .../GridPortableMarshalerAwareTestClass.java    |   67 -
 .../mutabletest/GridPortableTestClasses.java    |  434 --
 .../portable/mutabletest/package-info.java      |   22 -
 .../ignite/internal/portable/package-info.java  |   22 -
 .../portable/test/GridPortableTestClass1.java   |   28 -
 .../portable/test/GridPortableTestClass2.java   |   24 -
 .../internal/portable/test/package-info.java    |   22 -
 .../test/subpackage/GridPortableTestClass3.java |   24 -
 .../portable/test/subpackage/package-info.java  |   22 -
 .../CacheStoreUsageMultinodeAbstractTest.java   |   16 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   57 +-
 .../GridCacheAbstractRemoveFailureTest.java     |  199 +-
 .../cache/GridCacheMemoryModeSelfTest.java      |    9 +-
 .../processors/cache/GridCacheMvccSelfTest.java |    4 +-
 .../cache/GridCacheP2PUndeploySelfTest.java     |   30 +-
 .../cache/GridCachePutAllFailoverSelfTest.java  |   88 +-
 .../GridCacheVariableTopologySelfTest.java      |    5 +-
 .../cache/IgniteCacheCreateRestartSelfTest.java |  106 +
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |   73 +
 .../cache/IgniteCachePutAllRestartTest.java     |    2 +
 .../cache/IgniteInternalCacheTypesTest.java     |    4 +-
 .../cache/IgniteOnePhaseCommitNearSelfTest.java |  243 ++
 .../IgniteTxExceptionAbstractSelfTest.java      |   29 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |  149 +-
 ...ridCachePartitionNotLoadedEventSelfTest.java |   27 +-
 .../GridCacheTransformEventSelfTest.java        |    5 +-
 .../GridCacheColocatedTxExceptionSelfTest.java  |    2 +-
 .../GridCacheDhtAtomicRemoveFailureTest.java    |   16 +-
 .../dht/GridCacheDhtRemoveFailureTest.java      |   16 +-
 .../dht/GridCacheTxNodeFailureSelfTest.java     |  405 ++
 .../dht/GridNearCacheTxNodeFailureSelfTest.java |   31 +
 ...gniteAtomicLongChangingTopologySelfTest.java |  278 ++
 .../IgniteCacheCrossCacheTxFailoverTest.java    |   53 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  166 +-
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |    2 +
 ...gniteCachePutRetryTransactionalSelfTest.java |   50 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   98 +-
 ...tomicPrimaryWriteOrderRemoveFailureTest.java |   15 +-
 .../GridCacheAtomicRemoveFailureTest.java       |   15 +-
 .../GridCacheAtomicNearRemoveFailureTest.java   |   15 +-
 ...cPrimaryWriteOrderNearRemoveFailureTest.java |   15 +-
 .../near/GridCacheNearRemoveFailureTest.java    |   15 +-
 .../near/GridCacheNearTxExceptionSelfTest.java  |    2 +-
 .../GridCachePartitionedNodeRestartTest.java    |    9 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |    9 +-
 .../near/IgniteCacheNearOnlyTxTest.java         |   14 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |    8 +-
 .../GridCacheReplicatedTxExceptionSelfTest.java |    2 +-
 .../GridCacheLocalTxExceptionSelfTest.java      |    2 +-
 ...ClientNodePortableMetadataMultinodeTest.java |  295 --
 ...GridCacheClientNodePortableMetadataTest.java |  286 --
 ...ableObjectsAbstractDataStreamerSelfTest.java |  190 -
 ...bleObjectsAbstractMultiThreadedSelfTest.java |  231 --
 ...ridCachePortableObjectsAbstractSelfTest.java |  978 -----
 .../GridCachePortableStoreAbstractSelfTest.java |  297 --
 .../GridCachePortableStoreObjectsSelfTest.java  |   55 -
 ...GridCachePortableStorePortablesSelfTest.java |   66 -
 ...ridPortableCacheEntryMemorySizeSelfTest.java |   55 -
 ...leDuplicateIndexObjectsAbstractSelfTest.java |  158 -
 .../DataStreamProcessorPortableSelfTest.java    |   66 -
 .../GridDataStreamerImplSelfTest.java           |  345 --
 ...ridCacheAffinityRoutingPortableSelfTest.java |   47 -
 ...lyPortableDataStreamerMultiNodeSelfTest.java |   29 -
 ...rtableDataStreamerMultithreadedSelfTest.java |   47 -
 ...artitionedOnlyPortableMultiNodeSelfTest.java |   28 -
 ...tionedOnlyPortableMultithreadedSelfTest.java |   47 -
 .../GridCacheMemoryModePortableSelfTest.java    |   36 -
 ...acheOffHeapTieredAtomicPortableSelfTest.java |   47 -
 ...eapTieredEvictionAtomicPortableSelfTest.java |   95 -
 ...heOffHeapTieredEvictionPortableSelfTest.java |   95 -
 .../GridCacheOffHeapTieredPortableSelfTest.java |   47 -
 ...ateIndexObjectPartitionedAtomicSelfTest.java |   38 -
 ...xObjectPartitionedTransactionalSelfTest.java |   41 -
 ...AtomicNearDisabledOffheapTieredSelfTest.java |   29 -
 ...rtableObjectsAtomicNearDisabledSelfTest.java |   51 -
 ...tableObjectsAtomicOffheapTieredSelfTest.java |   29 -
 .../GridCachePortableObjectsAtomicSelfTest.java |   51 -
 ...tionedNearDisabledOffheapTieredSelfTest.java |   30 -
 ...eObjectsPartitionedNearDisabledSelfTest.java |   51 -
 ...ObjectsPartitionedOffheapTieredSelfTest.java |   30 -
 ...CachePortableObjectsPartitionedSelfTest.java |   51 -
 ...sNearPartitionedByteArrayValuesSelfTest.java |   41 -
 ...sPartitionedOnlyByteArrayValuesSelfTest.java |   42 -
 ...dCachePortableObjectsReplicatedSelfTest.java |   51 -
 ...CachePortableObjectsAtomicLocalSelfTest.java |   32 -
 ...rtableObjectsLocalOffheapTieredSelfTest.java |   29 -
 .../GridCachePortableObjectsLocalSelfTest.java  |   51 -
 .../processors/igfs/IgfsAbstractSelfTest.java   |   61 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |   10 +-
 .../processors/igfs/IgfsStartCacheTest.java     |    2 +-
 .../offheap/GridOffHeapMapAbstractSelfTest.java |    6 +-
 ...idOffHeapPartitionedMapAbstractSelfTest.java |   10 +-
 .../stream/socket/SocketStreamerSelfTest.java   |   29 +-
 .../ignite/testframework/GridTestUtils.java     |  117 +
 .../ignite/testframework/junits/IgniteMock.java |    8 +-
 .../multijvm/IgniteCacheProcessProxy.java       |    5 -
 .../junits/multijvm/IgniteProcessProxy.java     |    2 +-
 .../IgniteCacheFailoverTestSuite.java           |   15 +-
 .../IgniteCacheFailoverTestSuite3.java          |   62 +
 .../testsuites/IgniteCacheRestartTestSuite.java |   15 +-
 .../IgniteCacheRestartTestSuite2.java           |   47 +
 .../IgnitePortableCacheFullApiTestSuite.java    |   37 -
 .../IgnitePortableCacheTestSuite.java           |  103 -
 .../IgnitePortableObjectsTestSuite.java         |   92 -
 .../ignite/portable/test1/1.1/test1-1.1.jar     |  Bin 2548 -> 0 bytes
 .../ignite/portable/test1/1.1/test1-1.1.pom     |    9 -
 .../portable/test1/maven-metadata-local.xml     |   12 -
 .../ignite/portable/test2/1.1/test2-1.1.jar     |  Bin 1361 -> 0 bytes
 .../ignite/portable/test2/1.1/test2-1.1.pom     |    9 -
 .../portable/test2/maven-metadata-local.xml     |   12 -
 .../hadoop/SecondaryFileSystemProvider.java     |    4 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  100 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |    6 +
 .../testsuites/IgniteHadoopTestSuite.java       |    6 +-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |   31 +-
 ...CacheScanPartitionQueryFallbackSelfTest.java |  105 +-
 ...QueryOffheapEvictsMultiThreadedSelfTest.java |    5 -
 ...lientQueryReplicatedNodeRestartSelfTest.java |    8 +-
 .../IgniteCacheQueryNodeRestartSelfTest.java    |    4 +-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   10 +-
 .../IgniteCacheReplicatedQuerySelfTest.java     |   49 +-
 .../IgnitePortableCacheQueryTestSuite.java      |  117 -
 .../platform/PlatformContextImpl.java           |    4 +-
 .../platform/compute/PlatformCompute.java       |    2 +-
 .../cpp/PlatformCppConfigurationClosure.java    |    2 +-
 .../PlatformDotNetConfigurationClosure.java     |    6 +-
 .../platform/events/PlatformEvents.java         |    2 +-
 .../services/PlatformAbstractService.java       |    3 +-
 .../Cache/CacheAbstractTest.cs                  |   71 +-
 .../Config/Compute/compute-grid1.xml            |    8 +-
 .../PlatformComputePortableArgTask.java         |    8 +-
 .../ignite/schema/generator/CodeGenerator.java  |    4 +-
 .../ignite/schema/model/PojoDescriptor.java     |    6 +-
 .../parser/dialect/OracleMetadataDialect.java   |    7 +-
 .../org/apache/ignite/IgniteSpringBean.java     |    7 -
 .../yardstick/config/benchmark-query.properties |    5 +-
 modules/yardstick/config/ignite-base-config.xml |    2 +-
 modules/yardstick/config/ignite-jdbc-config.xml |   55 +
 parent/pom.xml                                  |   10 -
 pom.xml                                         |   10 +
 343 files changed, 15499 insertions(+), 18992 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b80b1719/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 6786b7e,da15de3..1056990
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@@ -267,10 -233,10 +268,11 @@@ public class SocketStreamerSelfTest ext
       * @param converter Converter.
       * @param r Runnable..
       */
-     private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
-         boolean oneMessagePerTuple) throws Exception
-     {
 -    private void test(@Nullable SocketMessageConverter<Tuple> converter,
 -        @Nullable byte[] delim,
 -        Runnable r) throws Exception {
 -        SocketStreamer<Tuple, Integer, String> sockStmr = null;
++    private void test(@Nullable SocketMessageConverter<Message> converter, 
++        @Nullable byte[] delim, 
++        Runnable r,
++        boolean oneMessagePerTuple) throws Exception {
 +        SocketStreamer<Message, Integer, String> sockStmr = null;
  
          Ignite ignite = grid(0);
  


[2/4] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.

Posted by ra...@apache.org.
IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e

Branch: refs/heads/master
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |  5 +-
 .../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
 .../stream/StreamSingleTupleExtractor.java      | 40 ++++++++++++
 .../ignite/stream/StreamTupleExtractor.java     | 23 +++----
 .../ignite/stream/socket/SocketStreamer.java    |  2 +-
 .../stream/socket/SocketStreamerSelfTest.java   |  6 +-
 6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
 import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
 import org.apache.ignite.examples.streaming.wordcount.QueryWords;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.socket.SocketMessageConverter;
 import org.apache.ignite.stream.socket.SocketStreamer;
 
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
             }
         });
 
-        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+        sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
             @Override public Map.Entry<AffinityUuid, String> extract(String word) {
                 // By using AffinityUuid we ensure that identical
                 // words are processed on the same cluster node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-    /** Tuple extractor. */
-    private StreamTupleExtractor<T, K, V> extractor;
+
+    /** Tuple extractor extracting a single tuple from an event */
+    private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
     /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
     private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
      * Stream adapter.
      *
      * @param stmr Streamer.
-     * @param extractor Tuple extractor.
+     * @param extractor Tuple extractor (1:1).
      */
-    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
         this.stmr = stmr;
-        this.extractor = extractor;
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor (1:n).
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.multipleTupleExtractor = extractor;
     }
 
     /**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
 
     /**
      * @return Provided tuple extractor.
+     * @see #getSingleTupleExtractor()
      */
+    @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        return extractor;
+        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+            return (StreamTupleExtractor) singleTupleExtractor;
+        }
+        throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+            "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
 
     /**
-     * @param extractor Extractor for key-value tuples from messages.
+     * @param extractor Extractor for a single key-value tuple from the message.
+     * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
      */
+    @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        this.extractor = extractor;
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * @return Provided single tuple extractor.
+     */
+    public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+        return singleTupleExtractor;
+    }
+
+    /**
+     * @param singleTupleExtractor Extractor for key-value tuples from messages.
+     */
+    public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = singleTupleExtractor;
     }
 
     /**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+        if (singleTupleExtractor != null) {
+            throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+        }
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
      * underlying streamer.
-     * <p>
-     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
-     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
         if (multipleTupleExtractor == null) {
-            Map.Entry<K, V> e = extractor.extract(msg);
+            Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
 
             if (e != null)
                 stmr.addData(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
 
 package org.apache.ignite.stream;
 
-import java.util.Map;
-
 /**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
  *
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
  */
-public interface StreamTupleExtractor<T, K, V> {
-    /**
-     * Extracts a key-value tuple from a message.
-     *
-     * @param msg Message.
-     * @return Key-value tuple.
-     */
-    public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+        A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
             "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
             sockStmr.setDelimiter(delim);
 
             if (oneMessagePerTuple) {
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+                sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
                     @Override public Map.Entry<Integer, String> extract(Message msg) {
                         return new IgniteBiTuple<>(msg.key, msg.val);
                     }