You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/21 13:02:26 UTC
[10/15] ignite git commit: Direct marshalling optimizations
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 987090d..aa88808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.jetbrains.annotations.Nullable;
@@ -41,46 +40,48 @@ public class GridDirectParser implements GridNioParser {
private final MessageFactory msgFactory;
/** */
- private final MessageFormatter formatter;
+ private final GridNioMessageReaderFactory readerFactory;
/**
* @param msgFactory Message factory.
- * @param formatter Formatter.
+ * @param readerFactory Message reader factory.
*/
- public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) {
+ public GridDirectParser(MessageFactory msgFactory, GridNioMessageReaderFactory readerFactory) {
assert msgFactory != null;
- assert formatter != null;
+ assert readerFactory != null;
this.msgFactory = msgFactory;
- this.formatter = formatter;
+ this.readerFactory = readerFactory;
}
/** {@inheritDoc} */
@Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
throws IOException, IgniteCheckedException {
- Message msg = ses.removeMeta(MSG_META_KEY);
+ MessageReader reader = ses.meta(READER_META_KEY);
- MessageReader reader = null;
+ if (reader == null)
+ ses.addMeta(READER_META_KEY, reader = readerFactory.reader(ses, msgFactory));
- if (msg == null && buf.hasRemaining()) {
- msg = msgFactory.create(buf.get());
+ Message msg = ses.removeMeta(MSG_META_KEY);
- ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, msg.getClass()));
- }
+ if (msg == null && buf.hasRemaining())
+ msg = msgFactory.create(buf.get());
boolean finished = false;
if (buf.hasRemaining()) {
- if (reader == null)
- reader = ses.meta(READER_META_KEY);
-
- assert reader != null;
+ if (reader != null)
+ reader.setCurrentReadClass(msg.getClass());
finished = msg.readFrom(buf, reader);
}
- if (finished)
+ if (finished) {
+ if (reader != null)
+ reader.reset();
+
return msg;
+ }
else {
ses.addMeta(MSG_META_KEY, msg);
@@ -93,4 +94,4 @@ public class GridDirectParser implements GridNioParser {
// No encoding needed for direct messages.
throw new UnsupportedEncodingException();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
new file mode 100644
index 0000000..cb7e62d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+
+/**
+ * Message reader factory.
+ */
+public interface GridNioMessageReaderFactory {
+ /**
+ * Creates new reader.
+ *
+ * @param ses Current session.
+ * @param msgFactory Message factory.
+ * @return Reader.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
new file mode 100644
index 0000000..6c4dee4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message writer factory.
+ */
+public interface GridNioMessageWriterFactory {
+ /**
+ * Creates new writer.
+ *
+ * @param ses Current session.
+ * @return Writer.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5bd08e7..384ee1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -62,7 +62,6 @@ import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -176,7 +175,7 @@ public class GridNioServer<T> {
/** */
@GridToStringExclude
- private MessageFormatter formatter;
+ private GridNioMessageWriterFactory writerFactory;
/** */
@GridToStringExclude
@@ -212,7 +211,7 @@ public class GridNioServer<T> {
* @param directMode Whether direct mode is used.
* @param daemon Daemon flag to create threads.
* @param metricsLsnr Metrics listener.
- * @param formatter Message formatter.
+ * @param writerFactory Writer factory.
* @param skipRecoveryPred Skip recovery predicate.
* @param msgQueueLsnr Message queue size listener.
* @param filters Filters for this server.
@@ -234,7 +233,7 @@ public class GridNioServer<T> {
boolean directMode,
boolean daemon,
GridNioMetricsListener metricsLsnr,
- MessageFormatter formatter,
+ GridNioMessageWriterFactory writerFactory,
IgnitePredicate<Message> skipRecoveryPred,
IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
GridNioFilter... filters
@@ -304,7 +303,7 @@ public class GridNioServer<T> {
this.directMode = directMode;
this.metricsLsnr = metricsLsnr;
- this.formatter = formatter;
+ this.writerFactory = writerFactory;
this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
}
@@ -934,8 +933,14 @@ public class GridNioServer<T> {
MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
- if (writer == null)
- ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+ if (writer == null) {
+ try {
+ ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to create message writer.", e);
+ }
+ }
boolean handshakeFinished = sslFilter.lock(ses);
@@ -1129,8 +1134,14 @@ public class GridNioServer<T> {
MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
- if (writer == null)
- ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+ if (writer == null) {
+ try {
+ ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to create message writer.", e);
+ }
+ }
if (req == null) {
req = (NioOperationFuture<?>)ses.pollFuture();
@@ -1152,7 +1163,7 @@ public class GridNioServer<T> {
finished = msg.writeTo(buf, writer);
- if (finished)
+ if (finished && writer != null)
writer.reset();
}
@@ -1176,7 +1187,7 @@ public class GridNioServer<T> {
finished = msg.writeTo(buf, writer);
- if (finished)
+ if (finished && writer != null)
writer.reset();
}
@@ -2221,8 +2232,8 @@ public class GridNioServer<T> {
/** Daemon flag. */
private boolean daemon;
- /** Message formatter. */
- private MessageFormatter formatter;
+ /** Writer factory. */
+ private GridNioMessageWriterFactory writerFactory;
/** Skip recovery predicate. */
private IgnitePredicate<Message> skipRecoveryPred;
@@ -2253,7 +2264,7 @@ public class GridNioServer<T> {
directMode,
daemon,
metricsLsnr,
- formatter,
+ writerFactory,
skipRecoveryPred,
msgQueueLsnr,
filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
@@ -2450,11 +2461,11 @@ public class GridNioServer<T> {
}
/**
- * @param formatter Message formatter.
+ * @param writerFactory Writer factory.
* @return This for chaining.
*/
- public Builder<T> messageFormatter(MessageFormatter formatter) {
- this.formatter = formatter;
+ public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory) {
+ this.writerFactory = writerFactory;
return this;
}
@@ -2479,4 +2490,4 @@ public class GridNioServer<T> {
return this;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 93e789d..ebe86fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
-import org.jetbrains.annotations.Nullable;
/**
*
@@ -120,16 +119,17 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
}
/** {@inheritDoc} */
- @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
- IgniteInClosure<IgniteException> closure)
- throws IgniteCheckedException {
+ @Override public synchronized boolean sendMessage(UUID nodeId, Message msg,
+ IgniteInClosure<IgniteException> closure) throws IgniteCheckedException {
+ assert nodeId != null;
+
if (closed())
throw new IgniteCheckedException("Communication client was closed: " + this);
assert writeBuf.hasArray();
try {
- int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+ int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer(nodeId));
metricsLsnr.onBytesSent(cnt);
}
@@ -154,4 +154,4 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
@Override public String toString() {
return S.toString(GridShmemCommunicationClient.class, this, super.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
index 4c9e0ee..7bc2a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -17,6 +17,8 @@
package org.apache.ignite.plugin.extensions.communication;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.plugin.Extension;
/**
@@ -33,16 +35,19 @@ public interface MessageFormatter extends Extension {
/**
* Creates new message writer instance.
*
+ * @param rmtNodeId Remote node ID.
* @return Message writer.
+ * @throws IgniteCheckedException In case of error.
*/
- public MessageWriter writer();
+ public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException;
/**
* Creates new message reader instance.
*
- * @param factory Message factory.
- * @param msgCls Message class to read.
+ * @param rmtNodeId Remote node ID.
+ * @param msgFactory Message factory.
* @return Message reader.
+ * @throws IgniteCheckedException In case of error.
*/
- public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls);
-}
\ No newline at end of file
+ public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index d40a384..502c69f 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -39,6 +39,13 @@ public interface MessageReader {
public void setBuffer(ByteBuffer buf);
/**
+ * Sets type of message currently read.
+ *
+ * @param msgCls Message type.
+ */
+ public void setCurrentReadClass(Class<? extends Message> msgCls);
+
+ /**
* Callback that must be invoked by a message implementation before message body started decoding.
*
* @return {@code True} if reading can proceed, {@code false} otherwise.
@@ -272,4 +279,21 @@ public interface MessageReader {
* Increments read state.
*/
public void incrementState();
-}
\ No newline at end of file
+
+ /**
+ * Callback called before inner message is read.
+ */
+ public void beforeInnerMessageRead();
+
+ /**
+ * Callback called after inner message is read.
+ *
+ * @param finished Whether message was fully read.
+ */
+ public void afterInnerMessageRead(boolean finished);
+
+ /**
+ * Resets this reader.
+ */
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index cd46de4..1cb202c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -696,11 +696,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
if (msgFormatter0 == null) {
msgFormatter0 = new MessageFormatter() {
- @Override public MessageWriter writer() {
+ @Override public MessageWriter writer(UUID rmtNodeId) {
throw new IgniteException("Failed to write message, node is not started.");
}
- @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
+ @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) {
throw new IgniteException("Failed to read message, node is not started.");
}
};
@@ -880,4 +880,4 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e8bd8a1..68e2f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -76,7 +76,9 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
+import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -1575,29 +1577,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- MessageFormatter msgFormatter = new MessageFormatter() {
- private MessageFormatter impl;
+ GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
+ private MessageFormatter formatter;
- @Override public MessageWriter writer() {
- if (impl == null)
- impl = getSpiContext().messageFormatter();
+ @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
+ throws IgniteCheckedException {
+ if (formatter == null)
+ formatter = getSpiContext().messageFormatter();
- assert impl != null;
+ assert formatter != null;
- return impl.writer();
+ UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+ return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
}
+ };
- @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
- if (impl == null)
- impl = getSpiContext().messageFormatter();
+ GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
+ private MessageFormatter formatter;
- assert impl != null;
+ @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
+ if (formatter == null)
+ formatter = getSpiContext().messageFormatter();
+
+ assert formatter != null;
+
+ UUID rmtNodeId = ses.meta(NODE_ID_META);
- return impl.reader(factory, msgCls);
+ return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
}
};
- GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
+ GridDirectParser parser = new GridDirectParser(msgFactory, readerFactory);
IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
@Override public boolean apply(Message msg) {
@@ -1658,7 +1669,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.metricsListener(metricsLsnr)
.writeTimeout(sockWriteTimeout)
.filters(filters)
- .messageFormatter(msgFormatter)
+ .writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.build();
@@ -1918,7 +1929,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
UUID nodeId = null;
- if (!client.async() && !locNode.version().equals(node.version()))
+ if (!client.async())
nodeId = node.id();
retry = client.sendMessage(nodeId, msg, ackC);
@@ -2591,7 +2602,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
buf.order(ByteOrder.nativeOrder());
- boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer());
+ boolean written = msg.writeTo(buf, null);
assert written;
@@ -2932,25 +2943,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- MessageFormatter msgFormatter = new MessageFormatter() {
- private MessageFormatter impl;
+ GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
+ private MessageFormatter formatter;
- @Override public MessageWriter writer() {
- if (impl == null)
- impl = getSpiContext().messageFormatter();
+ @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
+ if (formatter == null)
+ formatter = getSpiContext().messageFormatter();
- assert impl != null;
+ assert formatter != null;
- return impl.writer();
+ UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+ return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
}
+ };
- @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
- if (impl == null)
- impl = getSpiContext().messageFormatter();
+ GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
+ private MessageFormatter formatter;
- assert impl != null;
+ @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
+ throws IgniteCheckedException {
+ if (formatter == null)
+ formatter = getSpiContext().messageFormatter();
+
+ assert formatter != null;
+
+ UUID rmtNodeId = ses.meta(NODE_ID_META);
- return impl.reader(factory, msgCls);
+ return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
}
};
@@ -2959,8 +2979,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log,
endpoint,
srvLsnr,
- msgFormatter,
- new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
+ writerFactory,
+ new GridNioCodecFilter(new GridDirectParser(msgFactory, readerFactory), log, true),
new GridConnectionBytesVerifyFilter(log)
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
index f0f41bf..176973e 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
@@ -29,6 +29,11 @@ import org.apache.ignite.cache.CacheEntryProcessor;
/**
* Convenience adapter to transform update existing values in streaming cache
* based on the previously cached value.
+ * <p>
+ * This transformer implement {@link EntryProcessor} and internally will call
+ * {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method. Note
+ * that the value received from the data streamer will be passed to the entry
+ * processor as an argument.
*/
public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, EntryProcessor<K, V, Object> {
/** */
@@ -37,7 +42,7 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E
/** {@inheritDoc} */
@Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException {
for (Map.Entry<K, V> entry : entries)
- cache.invoke(entry.getKey(), this);
+ cache.invoke(entry.getKey(), this, entry.getValue());
}
/**
@@ -53,4 +58,4 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E
}
};
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 906a050..e257a97 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -37,12 +37,12 @@ import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -492,12 +492,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
@Override public MessageFormatter messageFormatter() {
if (formatter == null) {
formatter = new MessageFormatter() {
- @Override public MessageWriter writer() {
- return new DirectMessageWriter();
+ @Override public MessageWriter writer(UUID rmtNodeId) {
+ return new DirectMessageWriter(GridIoManager.DIRECT_PROTO_VER);
}
- @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
- return new DirectMessageReader(factory, this);
+ @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) {
+ return new DirectMessageReader(msgFactory, GridIoManager.DIRECT_PROTO_VER);
}
};
}
@@ -573,4 +573,4 @@ public class GridSpiTestContext implements IgniteSpiContext {
this.obj = obj;
}
}
-}
\ No newline at end of file
+}