You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/09/21 17:34:02 UTC
[1/4] ignite git commit: IGNITE-1370 Streamers: Implement multiple
tuple extractor.
Repository: ignite
Updated Branches:
refs/heads/master eae4df1e5 -> 421a5234b
IGNITE-1370 Streamers: Implement multiple tuple extractor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d9734a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d9734a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d9734a0
Branch: refs/heads/master
Commit: 4d9734a0842f0a46310c1f6261fdf42371db8705
Parents: b736c46
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Sep 3 23:31:08 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Sun Sep 13 01:20:24 2015 +0100
----------------------------------------------------------------------
.../org/apache/ignite/stream/StreamAdapter.java | 48 ++++++++-
.../stream/StreamMultipleTupleExtractor.java | 38 +++++++
.../ignite/stream/StreamTupleExtractor.java | 5 +
.../ignite/stream/socket/SocketStreamer.java | 3 +-
.../stream/socket/SocketStreamerSelfTest.java | 104 +++++++++++++++----
5 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 97edcbb..ffa0821 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.stream;
import java.util.Map;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
@@ -26,11 +27,22 @@ import org.apache.ignite.IgniteDataStreamer;
* streaming from different data sources. The purpose of adapters is to
* convert different message formats into Ignite stream key-value tuples
* and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ * <p>
+ * Two types of tuple extractors are supported:
+ * <ol>
+ * <li>A single tuple extractor, which extracts either no or 1 tuple out of a message. See
+ * see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
+ * <li>A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the
+ * form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
+ * </ol>
*/
public abstract class StreamAdapter<T, K, V> {
/** Tuple extractor. */
private StreamTupleExtractor<T, K, V> extractor;
+ /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
+ private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
+
/** Streamer. */
private IgniteDataStreamer<K, V> stmr;
@@ -84,6 +96,20 @@ public abstract class StreamAdapter<T, K, V> {
}
/**
+ * @return Provided tuple extractor (for 1:n cardinality).
+ */
+ public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
+ return multipleTupleExtractor;
+ }
+
+ /**
+ * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
+ */
+ public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+ this.multipleTupleExtractor = multipleTupleExtractor;
+ }
+
+ /**
* @return Provided {@link Ignite} instance.
*/
public Ignite getIgnite() {
@@ -98,14 +124,28 @@ public abstract class StreamAdapter<T, K, V> {
}
/**
- * Converts given message to a tuple and adds it to the underlying streamer.
+ * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
+ * underlying streamer.
+ * <p>
+ * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
+ * and the latter will be ignored.
*
* @param msg Message to convert.
*/
protected void addMessage(T msg) {
- Map.Entry<K, V> e = extractor.extract(msg);
+ if (multipleTupleExtractor == null) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
- if (e != null)
- stmr.addData(e);
+ } else {
+ Map<K, V> m = multipleTupleExtractor.extract(msg);
+
+ if (m != null)
+ stmr.addData(m);
+
+ }
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
new file mode 100644
index 0000000..71ad45a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a single message to zero, one or many tuples.
+ * <p>
+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
+ *
+ * @see StreamTupleExtractor
+ */
+public interface StreamMultipleTupleExtractor<T, K, V> {
+
+ /**
+ * Extracts a set of key-values from a message.
+ *
+ * @param msg Message.
+ * @return Map containing resulting tuples.
+ */
+ public Map<K, V> extract(T msg);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index b6150ab..aed7d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -21,6 +21,11 @@ import java.util.Map;
/**
* Stream tuple extractor to convert messages to Ignite key-value tuples.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
+ * produce more than one tuple.
+ *
+ * @see StreamMultipleTupleExtractor
*/
public interface StreamTupleExtractor<T, K, V> {
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0d27af9..c89952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* @throws IgniteException If failed.
*/
public void start() {
- A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+ "tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
A.ensure(threads > 0, "threads > 0");
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 185599d..8b05754 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -43,6 +44,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -111,7 +113,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
Marshaller marsh = new JdkMarshaller();
for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
+ byte[] msg = marsh.marshal(new Message(i));
os.write(msg.length >>> 24);
os.write(msg.length >>> 16);
@@ -125,21 +127,52 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultipleEntriesFromOneMessage() throws Exception {
+ test(null, null, new Runnable() {
+ @Override public void run() {
+ try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ int[] values = new int[CNT];
+ for (int i = 0; i < CNT; i++) {
+ values[i] = i;
+ }
+
+ byte[] msg = marsh.marshal(new Message(values));
+
+ os.write(msg.length >>> 24);
+ os.write(msg.length >>> 16);
+ os.write(msg.length >>> 8);
+ os.write(msg.length);
+
+ os.write(msg);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, false);
}
/**
* @throws Exception If failed.
*/
public void testSizeBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
+ SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+ @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF;
- return new Tuple(i);
+ return new Message(i);
}
};
@@ -164,7 +197,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
/**
@@ -178,7 +211,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
Marshaller marsh = new JdkMarshaller();
for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
+ byte[] msg = marsh.marshal(new Message(i));
os.write(msg);
os.write(DELIM);
@@ -188,7 +221,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
@@ -196,14 +229,14 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDelimiterBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
+ SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+ @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF;
- return new Tuple(i);
+ return new Message(i);
}
};
@@ -225,16 +258,17 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
/**
* @param converter Converter.
* @param r Runnable..
*/
- private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+ private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
+ boolean oneMessagePerTuple) throws Exception
{
- SocketStreamer<Tuple, Integer, String> sockStmr = null;
+ SocketStreamer<Message, Integer, String> sockStmr = null;
Ignite ignite = grid(0);
@@ -257,11 +291,24 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr.setDelimiter(delim);
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
- @Override public Map.Entry<Integer, String> extract(Tuple msg) {
- return new IgniteBiTuple<>(msg.key, msg.val);
- }
- });
+ if (oneMessagePerTuple) {
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Message msg) {
+ return new IgniteBiTuple<>(msg.key, msg.val);
+ }
+ });
+ }
+ else {
+ sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message, Integer, String>() {
+ @Override public Map<Integer, String> extract(Message msg) {
+ Map<Integer, String> answer = new HashMap<>();
+ for (int value : msg.values) {
+ answer.put(value, Integer.toString(value));
+ }
+ return answer;
+ }
+ });
+ }
if (converter != null)
sockStmr.setConverter(converter);
@@ -297,9 +344,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
}
/**
- * Tuple.
+ * Message.
*/
- private static class Tuple implements Serializable {
+ private static class Message implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -309,12 +356,25 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
/** Value. */
private final String val;
+ /** Multiple values. */
+ private final int[] values;
+
/**
* @param key Key.
*/
- Tuple(int key) {
+ Message(int key) {
this.key = key;
this.val = Integer.toString(key);
+ this.values = new int[0];
+ }
+
+ /**
+ * @param values Multiple values.
+ */
+ Message(int[] values) {
+ this.key = -1;
+ this.val = null;
+ this.values = values;
}
}
}
\ No newline at end of file
[4/4] ignite git commit: IGNITE-1370 Refactor StreamTupleExtractor
API for 0..1 and 0..n extraction.
Posted by ra...@apache.org.
IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/421a5234
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/421a5234
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/421a5234
Branch: refs/heads/master
Commit: 421a5234b5a7e56e36952a4c1976b3118310073e
Parents: eae4df1 b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:28:12 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:28:12 2015 +0100
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../org/apache/ignite/stream/StreamAdapter.java | 104 +++++++++++++++--
.../stream/StreamMultipleTupleExtractor.java | 38 +++++++
.../stream/StreamSingleTupleExtractor.java | 40 +++++++
.../ignite/stream/StreamTupleExtractor.java | 20 ++--
.../ignite/stream/socket/SocketStreamer.java | 3 +-
.../stream/socket/SocketStreamerSelfTest.java | 112 ++++++++++++++-----
7 files changed, 270 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
[3/4] ignite git commit: Merge branch 'master' into
feature/ignite-1370.
Posted by ra...@apache.org.
Merge branch 'master' into feature/ignite-1370.
# Conflicts:
# modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b80b1719
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b80b1719
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b80b1719
Branch: refs/heads/master
Commit: b80b17196269a036da65d781eb772a317039b014
Parents: b1dee96 6e48c9c
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 18:28:09 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 18:28:09 2015 +0100
----------------------------------------------------------------------
RELEASE_NOTES.txt | 2 +-
examples/config/example-default.xml | 76 -
examples/config/example-ignite.xml | 56 +-
examples/config/filesystem/README.txt | 2 +-
examples/config/filesystem/example-igfs.xml | 7 -
.../config/portable/example-ignite-portable.xml | 44 -
.../ignite/examples/portable/Address.java | 72 -
.../ignite/examples/portable/Employee.java | 93 -
.../ignite/examples/portable/EmployeeKey.java | 90 -
.../portable/ExamplePortableNodeStartup.java | 36 -
.../ignite/examples/portable/Organization.java | 93 -
.../examples/portable/OrganizationType.java | 32 -
...mputeClientPortableTaskExecutionExample.java | 154 -
.../portable/computegrid/ComputeClientTask.java | 116 -
.../portable/computegrid/package-info.java | 21 -
.../CacheClientPortablePutGetExample.java | 230 --
.../CacheClientPortableQueryExample.java | 325 --
.../portable/datagrid/package-info.java | 21 -
.../ignite/examples/portable/package-info.java | 21 -
.../CacheClientPortableExampleTest.java | 46 -
.../ComputeClientPortableExampleTest.java | 37 -
.../testsuites/IgniteExamplesSelfTestSuite.java | 6 -
modules/clients/pom.xml | 7 +
modules/clients/src/test/config/jdbc-config.xml | 55 +
.../jdbc2/JdbcComplexQuerySelfTest.java | 316 ++
.../internal/jdbc2/JdbcConnectionSelfTest.java | 268 ++
.../internal/jdbc2/JdbcEmptyCacheSelfTest.java | 140 +
.../internal/jdbc2/JdbcLocalCachesSelfTest.java | 156 +
.../internal/jdbc2/JdbcMetadataSelfTest.java | 334 ++
.../jdbc2/JdbcPreparedStatementSelfTest.java | 730 ++++
.../internal/jdbc2/JdbcResultSetSelfTest.java | 751 ++++
.../internal/jdbc2/JdbcStatementSelfTest.java | 292 ++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 11 +
modules/core/pom.xml | 21 -
.../src/main/java/org/apache/ignite/Ignite.java | 7 -
.../java/org/apache/ignite/IgniteCache.java | 44 +-
.../org/apache/ignite/IgniteJdbcDriver.java | 281 +-
.../java/org/apache/ignite/IgnitePortables.java | 370 --
.../apache/ignite/IgniteSystemProperties.java | 5 +-
.../configuration/CacheConfiguration.java | 70 +-
.../ignite/internal/GridKernalContext.java | 7 +-
.../ignite/internal/GridKernalContextImpl.java | 10 +-
.../apache/ignite/internal/GridLoggerProxy.java | 6 +-
.../org/apache/ignite/internal/IgniteEx.java | 9 +
.../apache/ignite/internal/IgniteKernal.java | 14 +-
.../ignite/internal/IgniteNodeAttributes.java | 5 +-
.../internal/executor/GridExecutorService.java | 4 +-
.../ignite/internal/jdbc/JdbcConnection.java | 4 +
.../internal/jdbc/JdbcConnectionInfo.java | 91 -
.../internal/jdbc/JdbcDatabaseMetadata.java | 4 +
.../internal/jdbc/JdbcPreparedStatement.java | 4 +
.../ignite/internal/jdbc/JdbcResultSet.java | 4 +
.../internal/jdbc/JdbcResultSetMetadata.java | 4 +
.../ignite/internal/jdbc/JdbcStatement.java | 4 +
.../apache/ignite/internal/jdbc/JdbcUtils.java | 4 +
.../ignite/internal/jdbc2/JdbcConnection.java | 777 ++++
.../internal/jdbc2/JdbcDatabaseMetadata.java | 1401 +++++++
.../internal/jdbc2/JdbcPreparedStatement.java | 411 ++
.../ignite/internal/jdbc2/JdbcQueryTask.java | 361 ++
.../ignite/internal/jdbc2/JdbcResultSet.java | 1520 +++++++
.../internal/jdbc2/JdbcResultSetMetadata.java | 171 +
.../ignite/internal/jdbc2/JdbcStatement.java | 456 +++
.../apache/ignite/internal/jdbc2/JdbcUtils.java | 155 +
.../deployment/GridDeploymentStoreAdapter.java | 4 +-
.../discovery/GridDiscoveryManager.java | 10 -
.../portable/GridPortableMarshaller.java | 2 +-
.../portable/PortableClassDescriptor.java | 10 +-
.../internal/portable/PortableContext.java | 32 +-
.../portable/PortableMetaDataCollector.java | 6 +-
.../portable/PortableMetaDataHandler.java | 4 +-
.../internal/portable/PortableMetaDataImpl.java | 14 +-
.../internal/portable/PortableObjectEx.java | 6 +-
.../internal/portable/PortableObjectImpl.java | 6 +-
.../portable/PortableObjectOffheapImpl.java | 6 +-
.../internal/portable/PortableRawReaderEx.java | 4 +-
.../internal/portable/PortableRawWriterEx.java | 4 +-
.../portable/PortableReaderContext.java | 2 +-
.../internal/portable/PortableReaderExImpl.java | 10 +-
.../ignite/internal/portable/PortableUtils.java | 2 +-
.../internal/portable/PortableWriterExImpl.java | 11 +-
.../internal/portable/api/IgnitePortables.java | 362 ++
.../internal/portable/api/PortableBuilder.java | 136 +
.../portable/api/PortableException.java | 57 +
.../internal/portable/api/PortableIdMapper.java | 54 +
.../api/PortableInvalidClassException.java | 58 +
.../portable/api/PortableMarshalAware.java | 48 +
.../portable/api/PortableMarshaller.java | 358 ++
.../internal/portable/api/PortableMetadata.java | 60 +
.../internal/portable/api/PortableObject.java | 152 +
.../portable/api/PortableProtocolVersion.java | 41 +
.../portable/api/PortableRawReader.java | 234 ++
.../portable/api/PortableRawWriter.java | 219 +
.../internal/portable/api/PortableReader.java | 284 ++
.../portable/api/PortableSerializer.java | 47 +
.../portable/api/PortableTypeConfiguration.java | 195 +
.../internal/portable/api/PortableWriter.java | 266 ++
.../portable/builder/PortableBuilderEnum.java | 2 +-
.../portable/builder/PortableBuilderImpl.java | 14 +-
.../portable/builder/PortableBuilderReader.java | 2 +-
.../builder/PortableBuilderSerializer.java | 2 +-
.../builder/PortableEnumArrayLazyValue.java | 4 +-
.../builder/PortableObjectArrayLazyValue.java | 2 +-
.../builder/PortablePlainPortableObject.java | 2 +-
.../streams/PortableAbstractInputStream.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 31 +-
.../cache/GridCacheClearAllRunnable.java | 4 +-
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheLogger.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 15 +-
.../processors/cache/GridCacheMvcc.java | 5 +-
.../processors/cache/GridCacheProcessor.java | 37 +-
.../cache/GridCacheSharedContext.java | 4 +-
.../processors/cache/GridCacheSwapManager.java | 46 +-
.../processors/cache/IgniteCacheProxy.java | 5 -
.../distributed/GridCacheTxRecoveryFuture.java | 11 +-
.../distributed/GridDistributedCacheEntry.java | 6 +-
.../GridDistributedTxFinishRequest.java | 13 +-
.../GridDistributedTxRemoteAdapter.java | 18 +-
.../dht/GridClientPartitionTopology.java | 104 +-
.../distributed/dht/GridDhtLocalPartition.java | 1 +
.../dht/GridDhtPartitionTopology.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 16 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 514 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 15 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 84 +-
.../dht/GridDhtTxFinishResponse.java | 89 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 67 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 34 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 40 +-
.../dht/GridPartitionedGetFuture.java | 4 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
.../distributed/near/GridNearLockRequest.java | 18 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 55 +-
.../GridNearPessimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTxFinishFuture.java | 345 +-
.../near/GridNearTxFinishRequest.java | 20 +-
.../cache/distributed/near/GridNearTxLocal.java | 64 +-
.../distributed/near/GridNearTxRemote.java | 38 +-
.../CacheDefaultPortableAffinityKeyMapper.java | 2 +-
.../portable/CacheObjectPortableContext.java | 2 +-
.../portable/CacheObjectPortableProcessor.java | 8 +-
.../CacheObjectPortableProcessorImpl.java | 12 +-
.../cache/portable/IgnitePortablesImpl.java | 10 +-
.../cache/store/CacheOsStoreManager.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 5 +-
.../cache/transactions/IgniteTxHandler.java | 286 +-
.../transactions/IgniteTxLocalAdapter.java | 37 +-
.../cache/transactions/IgniteTxManager.java | 48 +-
.../continuous/GridContinuousProcessor.java | 22 +-
.../datastructures/DataStructuresProcessor.java | 102 +-
.../datastructures/GridCacheAtomicLongImpl.java | 4 +-
.../GridCacheAtomicReferenceImpl.java | 4 +-
.../GridCacheAtomicSequenceImpl.java | 4 +-
.../GridCacheAtomicStampedImpl.java | 4 +-
.../GridCacheCountDownLatchImpl.java | 4 +-
.../GridTransactionalCacheQueueImpl.java | 15 +-
.../processors/igfs/IgfsFileAffinityRange.java | 4 +-
.../processors/igfs/IgfsFileWorkerBatch.java | 15 +-
.../igfs/IgfsFragmentizerManager.java | 8 +-
.../processors/igfs/IgfsServerManager.java | 5 +-
.../internal/processors/job/GridJobWorker.java | 4 +-
.../dotnet/PlatformDotNetConfiguration.java | 12 +-
.../PlatformDotNetPortableConfiguration.java | 12 +-
...PlatformDotNetPortableTypeConfiguration.java | 12 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../util/GridSpiCloseableIteratorWrapper.java | 5 +
.../ignite/internal/util/IgniteUtils.java | 16 +
.../util/offheap/GridOffHeapEvictListener.java | 2 +-
.../util/offheap/GridOffHeapPartitionedMap.java | 1 +
.../util/offheap/unsafe/GridUnsafeMap.java | 4 +-
.../marshaller/portable/PortableMarshaller.java | 358 --
.../marshaller/portable/package-info.java | 22 -
.../apache/ignite/portable/PortableBuilder.java | 137 -
.../ignite/portable/PortableException.java | 57 -
.../ignite/portable/PortableIdMapper.java | 56 -
.../portable/PortableInvalidClassException.java | 58 -
.../ignite/portable/PortableMarshalAware.java | 48 -
.../ignite/portable/PortableMetadata.java | 61 -
.../apache/ignite/portable/PortableObject.java | 154 -
.../portable/PortableProtocolVersion.java | 41 -
.../ignite/portable/PortableRawReader.java | 234 --
.../ignite/portable/PortableRawWriter.java | 219 -
.../apache/ignite/portable/PortableReader.java | 284 --
.../ignite/portable/PortableSerializer.java | 49 -
.../portable/PortableTypeConfiguration.java | 196 -
.../apache/ignite/portable/PortableWriter.java | 266 --
.../apache/ignite/portable/package-info.java | 22 -
.../resources/META-INF/classnames.properties | 20 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 45 -
.../GridPortableAffinityKeySelfTest.java | 218 -
.../GridPortableBuilderAdditionalSelfTest.java | 1226 ------
.../portable/GridPortableBuilderSelfTest.java | 1021 -----
...eBuilderStringAsCharsAdditionalSelfTest.java | 28 -
...ridPortableBuilderStringAsCharsSelfTest.java | 28 -
...idPortableMarshallerCtxDisabledSelfTest.java | 256 --
.../GridPortableMarshallerSelfTest.java | 3807 ------------------
.../GridPortableMetaDataDisabledSelfTest.java | 238 --
.../portable/GridPortableMetaDataSelfTest.java | 369 --
.../portable/GridPortableWildcardsSelfTest.java | 482 ---
.../GridPortableMarshalerAwareTestClass.java | 67 -
.../mutabletest/GridPortableTestClasses.java | 434 --
.../portable/mutabletest/package-info.java | 22 -
.../ignite/internal/portable/package-info.java | 22 -
.../portable/test/GridPortableTestClass1.java | 28 -
.../portable/test/GridPortableTestClass2.java | 24 -
.../internal/portable/test/package-info.java | 22 -
.../test/subpackage/GridPortableTestClass3.java | 24 -
.../portable/test/subpackage/package-info.java | 22 -
.../CacheStoreUsageMultinodeAbstractTest.java | 16 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 57 +-
.../GridCacheAbstractRemoveFailureTest.java | 199 +-
.../cache/GridCacheMemoryModeSelfTest.java | 9 +-
.../processors/cache/GridCacheMvccSelfTest.java | 4 +-
.../cache/GridCacheP2PUndeploySelfTest.java | 30 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 88 +-
.../GridCacheVariableTopologySelfTest.java | 5 +-
.../cache/IgniteCacheCreateRestartSelfTest.java | 106 +
.../IgniteCacheEntryProcessorNodeJoinTest.java | 73 +
.../cache/IgniteCachePutAllRestartTest.java | 2 +
.../cache/IgniteInternalCacheTypesTest.java | 4 +-
.../cache/IgniteOnePhaseCommitNearSelfTest.java | 243 ++
.../IgniteTxExceptionAbstractSelfTest.java | 29 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 149 +-
...ridCachePartitionNotLoadedEventSelfTest.java | 27 +-
.../GridCacheTransformEventSelfTest.java | 5 +-
.../GridCacheColocatedTxExceptionSelfTest.java | 2 +-
.../GridCacheDhtAtomicRemoveFailureTest.java | 16 +-
.../dht/GridCacheDhtRemoveFailureTest.java | 16 +-
.../dht/GridCacheTxNodeFailureSelfTest.java | 405 ++
.../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 +
...gniteAtomicLongChangingTopologySelfTest.java | 278 ++
.../IgniteCacheCrossCacheTxFailoverTest.java | 53 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 166 +-
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 2 +
...gniteCachePutRetryTransactionalSelfTest.java | 50 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 98 +-
...tomicPrimaryWriteOrderRemoveFailureTest.java | 15 +-
.../GridCacheAtomicRemoveFailureTest.java | 15 +-
.../GridCacheAtomicNearRemoveFailureTest.java | 15 +-
...cPrimaryWriteOrderNearRemoveFailureTest.java | 15 +-
.../near/GridCacheNearRemoveFailureTest.java | 15 +-
.../near/GridCacheNearTxExceptionSelfTest.java | 2 +-
.../GridCachePartitionedNodeRestartTest.java | 9 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 9 +-
.../near/IgniteCacheNearOnlyTxTest.java | 14 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 8 +-
.../GridCacheReplicatedTxExceptionSelfTest.java | 2 +-
.../GridCacheLocalTxExceptionSelfTest.java | 2 +-
...ClientNodePortableMetadataMultinodeTest.java | 295 --
...GridCacheClientNodePortableMetadataTest.java | 286 --
...ableObjectsAbstractDataStreamerSelfTest.java | 190 -
...bleObjectsAbstractMultiThreadedSelfTest.java | 231 --
...ridCachePortableObjectsAbstractSelfTest.java | 978 -----
.../GridCachePortableStoreAbstractSelfTest.java | 297 --
.../GridCachePortableStoreObjectsSelfTest.java | 55 -
...GridCachePortableStorePortablesSelfTest.java | 66 -
...ridPortableCacheEntryMemorySizeSelfTest.java | 55 -
...leDuplicateIndexObjectsAbstractSelfTest.java | 158 -
.../DataStreamProcessorPortableSelfTest.java | 66 -
.../GridDataStreamerImplSelfTest.java | 345 --
...ridCacheAffinityRoutingPortableSelfTest.java | 47 -
...lyPortableDataStreamerMultiNodeSelfTest.java | 29 -
...rtableDataStreamerMultithreadedSelfTest.java | 47 -
...artitionedOnlyPortableMultiNodeSelfTest.java | 28 -
...tionedOnlyPortableMultithreadedSelfTest.java | 47 -
.../GridCacheMemoryModePortableSelfTest.java | 36 -
...acheOffHeapTieredAtomicPortableSelfTest.java | 47 -
...eapTieredEvictionAtomicPortableSelfTest.java | 95 -
...heOffHeapTieredEvictionPortableSelfTest.java | 95 -
.../GridCacheOffHeapTieredPortableSelfTest.java | 47 -
...ateIndexObjectPartitionedAtomicSelfTest.java | 38 -
...xObjectPartitionedTransactionalSelfTest.java | 41 -
...AtomicNearDisabledOffheapTieredSelfTest.java | 29 -
...rtableObjectsAtomicNearDisabledSelfTest.java | 51 -
...tableObjectsAtomicOffheapTieredSelfTest.java | 29 -
.../GridCachePortableObjectsAtomicSelfTest.java | 51 -
...tionedNearDisabledOffheapTieredSelfTest.java | 30 -
...eObjectsPartitionedNearDisabledSelfTest.java | 51 -
...ObjectsPartitionedOffheapTieredSelfTest.java | 30 -
...CachePortableObjectsPartitionedSelfTest.java | 51 -
...sNearPartitionedByteArrayValuesSelfTest.java | 41 -
...sPartitionedOnlyByteArrayValuesSelfTest.java | 42 -
...dCachePortableObjectsReplicatedSelfTest.java | 51 -
...CachePortableObjectsAtomicLocalSelfTest.java | 32 -
...rtableObjectsLocalOffheapTieredSelfTest.java | 29 -
.../GridCachePortableObjectsLocalSelfTest.java | 51 -
.../processors/igfs/IgfsAbstractSelfTest.java | 61 +-
.../igfs/IgfsDualAbstractSelfTest.java | 10 +-
.../processors/igfs/IgfsStartCacheTest.java | 2 +-
.../offheap/GridOffHeapMapAbstractSelfTest.java | 6 +-
...idOffHeapPartitionedMapAbstractSelfTest.java | 10 +-
.../stream/socket/SocketStreamerSelfTest.java | 29 +-
.../ignite/testframework/GridTestUtils.java | 117 +
.../ignite/testframework/junits/IgniteMock.java | 8 +-
.../multijvm/IgniteCacheProcessProxy.java | 5 -
.../junits/multijvm/IgniteProcessProxy.java | 2 +-
.../IgniteCacheFailoverTestSuite.java | 15 +-
.../IgniteCacheFailoverTestSuite3.java | 62 +
.../testsuites/IgniteCacheRestartTestSuite.java | 15 +-
.../IgniteCacheRestartTestSuite2.java | 47 +
.../IgnitePortableCacheFullApiTestSuite.java | 37 -
.../IgnitePortableCacheTestSuite.java | 103 -
.../IgnitePortableObjectsTestSuite.java | 92 -
.../ignite/portable/test1/1.1/test1-1.1.jar | Bin 2548 -> 0 bytes
.../ignite/portable/test1/1.1/test1-1.1.pom | 9 -
.../portable/test1/maven-metadata-local.xml | 12 -
.../ignite/portable/test2/1.1/test2-1.1.jar | Bin 1361 -> 0 bytes
.../ignite/portable/test2/1.1/test2-1.1.pom | 9 -
.../portable/test2/maven-metadata-local.xml | 12 -
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/igfs/HadoopIgfsWrapper.java | 100 +-
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 +
.../testsuites/IgniteHadoopTestSuite.java | 6 +-
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 31 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 105 +-
...QueryOffheapEvictsMultiThreadedSelfTest.java | 5 -
...lientQueryReplicatedNodeRestartSelfTest.java | 8 +-
.../IgniteCacheQueryNodeRestartSelfTest.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 10 +-
.../IgniteCacheReplicatedQuerySelfTest.java | 49 +-
.../IgnitePortableCacheQueryTestSuite.java | 117 -
.../platform/PlatformContextImpl.java | 4 +-
.../platform/compute/PlatformCompute.java | 2 +-
.../cpp/PlatformCppConfigurationClosure.java | 2 +-
.../PlatformDotNetConfigurationClosure.java | 6 +-
.../platform/events/PlatformEvents.java | 2 +-
.../services/PlatformAbstractService.java | 3 +-
.../Cache/CacheAbstractTest.cs | 71 +-
.../Config/Compute/compute-grid1.xml | 8 +-
.../PlatformComputePortableArgTask.java | 8 +-
.../ignite/schema/generator/CodeGenerator.java | 4 +-
.../ignite/schema/model/PojoDescriptor.java | 6 +-
.../parser/dialect/OracleMetadataDialect.java | 7 +-
.../org/apache/ignite/IgniteSpringBean.java | 7 -
.../yardstick/config/benchmark-query.properties | 5 +-
modules/yardstick/config/ignite-base-config.xml | 2 +-
modules/yardstick/config/ignite-jdbc-config.xml | 55 +
parent/pom.xml | 10 -
pom.xml | 10 +
343 files changed, 15499 insertions(+), 18992 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b80b1719/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 6786b7e,da15de3..1056990
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@@ -267,10 -233,10 +268,11 @@@ public class SocketStreamerSelfTest ext
* @param converter Converter.
* @param r Runnable..
*/
- private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
- boolean oneMessagePerTuple) throws Exception
- {
- private void test(@Nullable SocketMessageConverter<Tuple> converter,
- @Nullable byte[] delim,
- Runnable r) throws Exception {
- SocketStreamer<Tuple, Integer, String> sockStmr = null;
++ private void test(@Nullable SocketMessageConverter<Message> converter,
++ @Nullable byte[] delim,
++ Runnable r,
++ boolean oneMessagePerTuple) throws Exception {
+ SocketStreamer<Message, Integer, String> sockStmr = null;
Ignite ignite = grid(0);
[2/4] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor
in favor of new Stream*Single*TupleExtractor.
Posted by ra...@apache.org.
IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e
Branch: refs/heads/master
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
.../stream/StreamSingleTupleExtractor.java | 40 ++++++++++++
.../ignite/stream/StreamTupleExtractor.java | 23 +++----
.../ignite/stream/socket/SocketStreamer.java | 2 +-
.../stream/socket/SocketStreamerSelfTest.java | 6 +-
6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Map;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
import org.apache.ignite.examples.streaming.wordcount.QueryWords;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.socket.SocketMessageConverter;
import org.apache.ignite.stream.socket.SocketStreamer;
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
}
});
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
@Override public Map.Entry<AffinityUuid, String> extract(String word) {
// By using AffinityUuid we ensure that identical
// words are processed on the same cluster node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
* </ol>
*/
public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Tuple extractor extracting a single tuple from an event */
+ private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
/** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
* Stream adapter.
*
* @param stmr Streamer.
- * @param extractor Tuple extractor.
+ * @param extractor Tuple extractor (1:1).
*/
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
this.stmr = stmr;
- this.extractor = extractor;
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor (1:n).
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.multipleTupleExtractor = extractor;
}
/**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
/**
* @return Provided tuple extractor.
+ * @see #getSingleTupleExtractor()
*/
+ @Deprecated
public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
+ if (singleTupleExtractor instanceof StreamTupleExtractor) {
+ return (StreamTupleExtractor) singleTupleExtractor;
+ }
+ throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+ "StreamTupleExtractor; use getSingleTupleExtractor instead");
}
/**
- * @param extractor Extractor for key-value tuples from messages.
+ * @param extractor Extractor for a single key-value tuple from the message.
+ * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
*/
+ @Deprecated
public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * @return Provided single tuple extractor.
+ */
+ public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+ return singleTupleExtractor;
+ }
+
+ /**
+ * @param singleTupleExtractor Extractor for key-value tuples from messages.
+ */
+ public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = singleTupleExtractor;
}
/**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+ if (singleTupleExtractor != null) {
+ throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+ }
this.multipleTupleExtractor = multipleTupleExtractor;
}
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
/**
* Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
* underlying streamer.
- * <p>
- * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
- * and the latter will be ignored.
*
* @param msg Message to convert.
*/
protected void addMessage(T msg) {
if (multipleTupleExtractor == null) {
- Map.Entry<K, V> e = extractor.extract(msg);
+ Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
if (e != null)
stmr.addData(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
package org.apache.ignite.stream;
-import java.util.Map;
-
/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
*
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
*/
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* @throws IgniteException If failed.
*/
public void start() {
- A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+ A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
"tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr.setDelimiter(delim);
if (oneMessagePerTuple) {
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(Message msg) {
return new IgniteBiTuple<>(msg.key, msg.val);
}