You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/12/01 00:00:53 UTC
[geode] 01/01: GEODE-6121 Port SnappyData DirectChannel comms
changes to Geode
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6121
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3aa2a894802c50db276d9c87f00432cd1908291d
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Nov 30 15:30:00 2018 -0800
GEODE-6121 Port SnappyData DirectChannel comms changes to Geode
Not ready for review!
Initial port of SnappyData peer-to-peer communication changes to Geode.
See https://github.com/SnappyDataInc/snappy-store/commit/49222766a35c99f4853b7b6661457f37b3c39349
The current changes have connection-streaming and conserve-sockets=false
as the default for testing but I will be changing this. The connection-streaming
feature will be an opt-in thing at first and probably be marked experimental.
---
geode-core/build.gradle | 1 +
.../internal/ClusterDistributionManager.java | 5 +-
.../distributed/internal/DistributionConfig.java | 2 +-
.../distributed/internal/direct/DirectChannel.java | 52 +-
.../geode/internal/shared/BufferAllocator.java | 198 ++++++
.../internal/shared/ChannelBufferOutputStream.java | 161 +++++
.../geode/internal/shared/InputStreamChannel.java | 308 ++++++++
.../apache/geode/internal/shared/NativeCalls.java | 0
.../geode/internal/shared/OutputStreamChannel.java | 239 +++++++
.../geode/internal/shared/StreamChannel.java | 42 ++
.../shared/unsafe/ChannelBufferInputStream.java | 167 +++++
.../unsafe/ChannelBufferUnsafeDataInputStream.java | 221 ++++++
.../ChannelBufferUnsafeDataOutputStream.java | 298 ++++++++
.../unsafe/ChannelBufferUnsafeInputStream.java | 270 +++++++
.../unsafe/ChannelBufferUnsafeOutputStream.java | 305 ++++++++
.../shared/unsafe/DirectBufferAllocator.java | 165 +++++
.../geode/internal/shared/unsafe/FreeMemory.java | 49 ++
.../geode/internal/shared/unsafe/UnsafeHolder.java | 397 +++++++++++
.../apache/geode/internal/tcp/BaseMsgStreamer.java | 6 +
.../org/apache/geode/internal/tcp/Buffers.java | 19 +-
.../geode/internal/tcp/ConnectExceptions.java | 0
.../org/apache/geode/internal/tcp/Connection.java | 781 ++++++++++++++++++---
.../apache/geode/internal/tcp/ConnectionTable.java | 225 +++++-
.../geode/internal/tcp/DirectReplySender.java | 14 +-
.../geode/internal/tcp/MsgChannelDestreamer.java | 112 +++
.../geode/internal/tcp/MsgChannelStreamer.java | 360 ++++++++++
.../apache/geode/internal/tcp/MsgOutputStream.java | 2 +-
.../org/apache/geode/internal/tcp/MsgStreamer.java | 9 +-
.../apache/geode/internal/tcp/MsgStreamerList.java | 23 +-
.../geode/internal/tcp/QueryKeyedObjectPool.java | 200 ++++++
.../org/apache/geode/internal/tcp/TCPConduit.java | 67 +-
.../geode/pdx/internal/unsafe/UnsafeWrapper.java | 5 +
.../geode/internal/tcp/ConnectionJUnitTest.java | 1 +
geode-core/src/test/resources/expected-pom.xml | 6 +
gradle/dependency-versions.properties | 1 +
35 files changed, 4523 insertions(+), 188 deletions(-)
diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 605fbb4..4c35fa2 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -159,6 +159,7 @@ dependencies {
compile('antlr:antlr:' + project.'antlr.version')
compile('com.fasterxml.jackson.core:jackson-annotations:' + project.'jackson.version')
compile('com.fasterxml.jackson.core:jackson-databind:' + project.'jackson.version')
+ compile('org.apache.commons:commons-pool2:' + project.'commons-pool2.version')
compile('commons-io:commons-io:' + project.'commons-io.version')
compile('commons-validator:commons-validator:' + project.'commons-validator.version')
compile('commons-digester:commons-digester:' + project.'commons-digester.version')
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index c523164..3c5c2b5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -151,7 +151,10 @@ public class ClusterDistributionManager implements DistributionManager {
public static final int MAX_THREADS =
Integer.getInteger("DistributionManager.MAX_THREADS", 100).intValue();
- private static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
+ public static final int MAX_PR_THREADS_SET =
+ Integer.getInteger("DistributionManager.MAX_PR_THREADS", -1);
+
+ public static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
Math.max(Runtime.getRuntime().availableProcessors() * 4, 16)).intValue();
public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS",
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 08e5fa0..c9f2ff8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1831,7 +1831,7 @@ public interface DistributionConfig extends Config, LogConfig, StatisticsConfig
/**
* The default value of the {@link ConfigurationProperties#CONSERVE_SOCKETS} property
*/
- boolean DEFAULT_CONSERVE_SOCKETS = true;
+ boolean DEFAULT_CONSERVE_SOCKETS = false;
/**
* Returns the value of the {@link ConfigurationProperties#ROLES} property
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index dbb4068..66c8c54 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -54,6 +54,7 @@ import org.apache.geode.internal.tcp.ConnectExceptions;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.internal.tcp.ConnectionException;
import org.apache.geode.internal.tcp.MemberShunnedException;
+import org.apache.geode.internal.tcp.MsgChannelStreamer;
import org.apache.geode.internal.tcp.MsgStreamer;
import org.apache.geode.internal.tcp.TCPConduit;
import org.apache.geode.internal.util.Breadcrumbs;
@@ -315,8 +316,10 @@ public class DirectChannel {
msg, p_destinations.length, Arrays.toString(p_destinations));
}
- try {
- do {
+ final boolean useNIOStream = getConduit().useNIOStream();
+ final List<Connection> allCons = new ArrayList<>(destinations.length);
+ do {
+ try {
interrupted = Thread.interrupted() || interrupted;
/**
* Exceptions that happened during one attempt to send
@@ -331,9 +334,14 @@ public class DirectChannel {
retryInfo = null;
retry = true;
}
- final List cons = new ArrayList(destinations.length);
- ConnectExceptions ce = getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout,
- ackSDTimeout, cons);
+ final List<Connection> cons = new ArrayList<>(destinations.length);
+ ConnectExceptions ce;
+ try {
+ ce = getConnections(mgr, msg, destinations, orderedMsg,
+ retry, ackTimeout, ackSDTimeout, cons);
+ } finally {
+ allCons.addAll(cons);
+ }
if (directReply && msg.getProcessorId() > 0) { // no longer a direct-reply message?
directReply = false;
}
@@ -382,8 +390,13 @@ public class DirectChannel {
DMStats stats = getDMStats();
List<?> sentCons; // used for cons we sent to this time
- final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats);
+ BaseMsgStreamer ms = null;
try {
+ if (useNIOStream) {
+ ms = MsgChannelStreamer.create(cons, msg, directReply, stats);
+ } else {
+ ms = MsgStreamer.create(cons, msg, directReply, stats);
+ }
startTime = 0;
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
@@ -409,7 +422,9 @@ public class DirectChannel {
ex);
} finally {
try {
- ms.close();
+ if (ms != null) {
+ ms.close();
+ }
} catch (IOException e) {
throw new InternalGemFireException("Unknown error serializing message", e);
}
@@ -453,16 +468,21 @@ public class DirectChannel {
if (retryInfo != null) {
this.conduit.getCancelCriterion().checkCancelInProgress(null);
}
- } while (retryInfo != null);
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- for (Iterator it = totalSentCons.iterator(); it.hasNext();) {
- Connection con = (Connection) it.next();
- con.setInUse(false, 0, 0, 0, null);
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ for (Iterator it = totalSentCons.iterator(); it.hasNext();) {
+ Connection con = (Connection) it.next();
+ con.setInUse(false, 0, 0, 0, null);
+ this.conduit.releasePooledConnection(con);
+ }
+ allCons.clear();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
- }
+ } while (retryInfo != null);
if (failedCe != null) {
throw failedCe;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java b/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java
new file mode 100644
index 0000000..663bac0
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.shared;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+import org.apache.geode.internal.shared.unsafe.FreeMemory;
+import org.apache.geode.internal.shared.unsafe.UnsafeHolder;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * Allocate, release and expand ByteBuffers (in-place if possible).
+ */
+public abstract class BufferAllocator implements Closeable {
+ private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+ public static final String STORE_DATA_FRAME_OUTPUT =
+ "STORE_DATA_FRAME_OUTPUT";
+
+ /**
+ * Special owner indicating execution pool memory.
+ */
+ public static final String EXECUTION = "EXECUTION";
+
+ /**
+ * Allocate a new ByteBuffer of given size.
+ */
+ public abstract ByteBuffer allocate(int size, String owner);
+
+ /**
+ * Allocate using the default allocator and fallback to base JDK one in case
+ * allocation fails due to some reason (e.g. system stop).
+ */
+ public ByteBuffer allocateWithFallback(int size, String owner) {
+ return allocate(size, owner);
+ }
+
+ /**
+ * Allocate a new ByteBuffer of given size for storage in a Region.
+ */
+ public abstract ByteBuffer allocateForStorage(int size);
+
+ /**
+ * Clears the memory to be zeros immediately after allocation.
+ */
+ public abstract void clearPostAllocate(ByteBuffer buffer);
+
+ /**
+ * Fill the given portion of the buffer setting it with given byte.
+ */
+ public final void fill(ByteBuffer buffer, byte b, int position, int numBytes) {
+ unsafe.setMemory(baseObject(buffer), baseOffset(buffer) + position,
+ numBytes, b);
+ }
+
+ /**
+ * Fill the buffer from its current position to full capacity with given byte.
+ */
+ public final void fill(ByteBuffer buffer, byte b) {
+ final int position = buffer.position();
+ fill(buffer, b, position, buffer.capacity() - position);
+ }
+
+ /**
+ * Get the base object of the ByteBuffer for raw reads/writes by Unsafe API.
+ */
+ public abstract Object baseObject(ByteBuffer buffer);
+
+ /**
+ * Get the base offset of the ByteBuffer for raw reads/writes by Unsafe API.
+ */
+ public abstract long baseOffset(ByteBuffer buffer);
+
+ /**
+ * Expand given ByteBuffer to new capacity. The new buffer is positioned
+ * at the start and caller has to reposition if required.
+ *
+ * @return the new expanded ByteBuffer
+ */
+ public abstract ByteBuffer expand(ByteBuffer buffer, int required,
+ String owner);
+
+ /**
+ * Return the data as a heap byte array. Use of this should be minimal
+ * when no other option exists.
+ */
+ public byte[] toBytes(ByteBuffer buffer) {
+ final int bufferSize = buffer.remaining();
+ int numBytes = Math.min(bufferSize, bufferSize);
+ byte[] bytes = new byte[numBytes];
+ int initPosition = buffer.position();
+ buffer.get(bytes, 0, numBytes);
+ buffer.position(initPosition);
+ return bytes;
+ }
+
+ /**
+ * Return a ByteBuffer either copying from, or sharing the given heap bytes.
+ */
+ public abstract ByteBuffer fromBytesToStorage(byte[] bytes, int offset,
+ int length);
+
+ /**
+ * Return a ByteBuffer either sharing data of given ByteBuffer
+ * if its type matches, or else copying from the given ByteBuffer.
+ */
+ public ByteBuffer transfer(ByteBuffer buffer, String owner) {
+ final int position = buffer.position();
+ final ByteBuffer newBuffer = allocate(buffer.limit(), owner);
+ buffer.rewind();
+ newBuffer.order(buffer.order());
+ newBuffer.put(buffer);
+ buffer.position(position);
+ newBuffer.position(position);
+ return newBuffer;
+ }
+
+ /**
+ * For direct ByteBuffers the release method is preferred to eagerly release
+ * the memory instead of depending on heap GC which can be delayed.
+ */
+ public final void release(ByteBuffer buffer) {
+ releaseBuffer(buffer);
+ }
+
+ /**
+ * For direct ByteBuffers the release method is preferred to eagerly release
+ * the memory instead of depending on heap GC which can be delayed.
+ */
+ public static boolean releaseBuffer(ByteBuffer buffer) {
+ final boolean hasArray = buffer.hasArray();
+ // Actual release should depend on buffer type and not allocator type.
+ // Reserved off-heap space will be decremented by FreeMemory implementation.
+ if (hasArray) {
+ buffer.rewind().limit(0);
+ return false;
+ } else {
+ UnsafeHolder.releaseDirectBuffer(buffer);
+ return true;
+ }
+ }
+
+ /**
+ * Indicates if this allocator will produce direct ByteBuffers.
+ */
+ public abstract boolean isDirect();
+
+ /**
+ * Return true if this is a managed direct buffer allocator.
+ */
+ public boolean isManagedDirect() {
+ return false;
+ }
+
+ /**
+ * Allocate a buffer passing a custom FreeMemoryFactory. Requires that
+ * appropriate calls against Spark memory manager have already been done.
+ * Only for managed buffer allocator.
+ */
+ public ByteBuffer allocateCustom(int size,
+ FreeMemory.Factory factory) {
+ throw new UnsupportedOperationException("Not supported for " + toString());
+ }
+
+ /**
+ * Any cleanup required at system close.
+ */
+ @Override
+ public abstract void close();
+
+ protected static int expandedSize(int currentUsed, int required) {
+ final long minRequired = (long) currentUsed + required;
+ // increase the size by 50%
+ final int newLength = (int) Math.min(Math.max((currentUsed * 3) >>> 1L,
+ minRequired), Integer.MAX_VALUE - 1);
+ if (newLength >= minRequired) {
+ return newLength;
+ } else {
+ throw new IndexOutOfBoundsException("Cannot allocate more than " +
+ newLength + " bytes but required " + minRequired);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java
new file mode 100644
index 0000000..eb99838
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * OutputStream for use by buffered write abstractions over channel using direct
+ * byte buffers. The implementation is not thread-safe by design. The class
+ * itself can be used as a non-synchronized efficient replacement of
+ * BufferedOutputStream.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferOutputStream extends OutputStreamChannel {
+
+ protected final ByteBuffer buffer;
+
+ public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ public ChannelBufferOutputStream(WritableByteChannel channel)
+ throws IOException {
+ this(channel, DEFAULT_BUFFER_SIZE);
+ }
+
+ public ChannelBufferOutputStream(WritableByteChannel channel, int bufferSize)
+ throws IOException {
+ super(channel);
+ if (bufferSize < 32) {
+ throw new IllegalArgumentException("buffer size " + bufferSize +
+ " should be at least 32");
+ }
+ this.buffer = allocateBuffer(bufferSize).order(ByteOrder.BIG_ENDIAN);
+ }
+
+ protected ByteBuffer allocateBuffer(int bufferSize) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void write(int b) throws IOException {
+ if (!this.buffer.hasRemaining()) {
+ flushBufferBlocking(this.buffer);
+ }
+ this.buffer.put((byte) (b & 0xff));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void write(byte[] b,
+ int off, int len) throws IOException {
+ if (len == 1) {
+ write(b[off]);
+ return;
+ }
+
+ while (len > 0) {
+ int remaining = this.buffer.remaining();
+ if (len <= remaining) {
+ this.buffer.put(b, off, len);
+ return;
+ } else {
+ // copy b to buffer and flush
+ if (remaining > 0) {
+ this.buffer.put(b, off, remaining);
+ len -= remaining;
+ off += remaining;
+ }
+ flushBufferBlocking(this.buffer);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int write(ByteBuffer src) throws IOException {
+ return super.writeBuffered(src, this.buffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeInt(int v) throws IOException {
+ if (this.buffer.remaining() < 4) {
+ flushBufferBlocking(this.buffer);
+ }
+ // ByteBuffer will always be big-endian
+ this.buffer.putInt(v);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() throws IOException {
+ if (this.buffer.position() > 0) {
+ flushBufferBlocking(this.buffer);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public final boolean isOpen() {
+ return this.channel.isOpen();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+ protected void flushBufferBlocking(final ByteBuffer buffer)
+ throws IOException {
+ buffer.flip();
+ try {
+ do {
+ writeBuffer(buffer, this.channel);
+ } while (buffer.hasRemaining());
+ } finally {
+ if (buffer.hasRemaining()) {
+ buffer.compact();
+ } else {
+ buffer.clear();
+ }
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java
new file mode 100644
index 0000000..936aadd
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * Intermediate class that extends both an InputStream and ReadableByteChannel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class InputStreamChannel extends InputStream implements
+ ReadableByteChannel, StreamChannel {
+
+ protected final ReadableByteChannel channel;
+ private volatile Thread parkedThread;
+ protected volatile long bytesRead;
+
+ /**
+ * The default wait to use when waiting to read/write a channel
+ * (when there is no selector to signal)
+ */
+ public static final long PARK_NANOS_FOR_READ_WRITE = 100L;
+
+ /**
+ * Retries before waiting for {@link #PARK_NANOS_FOR_READ_WRITE}
+ * (when there is no selector to signal)
+ */
+ public static final int RETRIES_BEFORE_PARK = 20;
+
+ /**
+ * Maximum nanos to park thread to wait for reading/writing data in
+ * non-blocking mode (if selector is present then it will explicitly signal)
+ */
+ public static final long PARK_NANOS_MAX = 30000000000L;
+
+ static long parkThreadForAsyncOperationIfRequired(
+ final StreamChannel channel, long parkedNanos, int numTries)
+ throws SocketTimeoutException {
+ // at this point we are out of the selector thread and don't want to
+ // create unlimited size buffers upfront in selector, so will use
+ // simple signalling between selector and this thread to proceed
+ if ((numTries % RETRIES_BEFORE_PARK) == 0) {
+ if (channel != null) {
+ channel.setParkedThread(Thread.currentThread());
+ }
+ LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+ if (channel != null) {
+ channel.setParkedThread(null);
+ if ((parkedNanos += PARK_NANOS_FOR_READ_WRITE) > channel.getParkNanosMax()) {
+ throw new SocketTimeoutException("Connection operation timed out.");
+ }
+ }
+ }
+ return parkedNanos;
+ }
+
+ protected InputStreamChannel(ReadableByteChannel channel) {
+ this.channel = channel;
+ }
+
+ /**
+ * Get the underlying {@link ReadableByteChannel}.
+ */
+ public final ReadableByteChannel getUnderlyingChannel() {
+ return this.channel;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public abstract int read(ByteBuffer dst) throws IOException;
+
+ /**
+ * Reads four input bytes and returns an <code>int</code> value. Let
+ * <code>a-d</code> be the first through fourth bytes read. The value returned
+ * is:
+ * <p>
+ *
+ * <pre>
+ * <code>
+ * (((a & 0xff) << 24) | ((b & 0xff) << 16) |
+ *  ((c & 0xff) << 8) | (d & 0xff))
+ * </code>
+ * </pre>
+ *
+ * This method is suitable for reading bytes written by the
+ * <code>writeInt</code> method of interface <code>DataOutput</code>.
+ *
+ * @return the <code>int</code> value read.
+ * @exception EOFException
+ * if this stream reaches the end before reading all the bytes.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public abstract int readInt() throws IOException;
+
+ public int readFrame() throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + ": readFrame not supported");
+ }
+
+ public int readFrameFragment(int fragmentSize) throws IOException {
+ throw new UnsupportedOperationException(getClass().getSimpleName()
+ + ": readFrameFragment not supported");
+ }
+
+ /**
+ * Common base method to read into a given ByteBuffer destination via an
+ * intermediate direct byte buffer owned by the implementation of this class.
+ */
+ protected final int readBuffered(final ByteBuffer dst,
+ final ByteBuffer channelBuffer) throws IOException {
+ int dstLen = dst.remaining();
+ // first copy anything remaining from buffer
+ final int remaining = channelBuffer.remaining();
+ if (dstLen <= remaining) {
+ if (dstLen > 0) {
+ final int pos = channelBuffer.position();
+ // reduce this buffer's limit temporarily to the required len
+ channelBuffer.limit(pos + dstLen);
+ try {
+ dst.put(channelBuffer);
+ } finally {
+ // restore the limit
+ channelBuffer.limit(pos + remaining);
+ }
+ return dstLen;
+ } else {
+ return 0;
+ }
+ }
+
+ // refill buffer once and read whatever available into buf;
+ // caller should invoke in a loop if buffer is still not full
+ int readBytes = 0;
+ if (remaining > 0) {
+ dst.put(channelBuffer);
+ dstLen -= remaining;
+ readBytes += remaining;
+ }
+ // if dst is a reasonably large direct byte buffer, then refill from
+ // channel directly else use channel direct buffer for best performance
+ if (dstLen >= (channelBuffer.limit() >>> 1) && dst.isDirect()) {
+ final int bufBytes = readIntoBufferNoWait(dst);
+ if (bufBytes > 0) {
+ return (readBytes + bufBytes);
+ } else {
+ return readBytes > 0 ? readBytes : bufBytes;
+ }
+ } else {
+ final int bufBytes = refillBufferBase(channelBuffer, -1, null);
+ if (bufBytes > 0) {
+ if (dstLen >= bufBytes) {
+ dst.put(channelBuffer);
+ return (readBytes + bufBytes);
+ } else {
+ final int pos = channelBuffer.position();
+ // reduce this buffer's limit temporarily to the required length
+ channelBuffer.limit(pos + dstLen);
+ try {
+ dst.put(channelBuffer);
+ } finally {
+ // restore the limit
+ channelBuffer.limit(pos + bufBytes);
+ }
+ return (readBytes + dstLen);
+ }
+ } else {
+ return readBytes > 0 ? readBytes : bufBytes;
+ }
+ }
+ }
+
+ protected int refillBuffer(final ByteBuffer channelBuffer,
+ final int tryReadBytes, final String eofMessage) throws IOException {
+ return refillBufferBase(channelBuffer, tryReadBytes, eofMessage);
+ }
+
+ protected final int refillBufferBase(final ByteBuffer channelBuffer,
+ final int tryReadBytes, final String eofMessage) throws IOException {
+ resetAndCopyLeftOverBytes(channelBuffer);
+ int initPosition = channelBuffer.position();
+ int totalReadBytes = initPosition;
+ final int channelBytes = readIntoBuffer(channelBuffer);
+ if (channelBytes > 0) {
+ totalReadBytes += channelBytes;
+ }
+ // eof on stream but we may still have remaining bytes in channelBuffer
+ else if (totalReadBytes == 0) {
+ totalReadBytes = channelBytes;
+ }
+ while (tryReadBytes > totalReadBytes && channelBuffer.hasRemaining()) {
+ int readBytes = readIntoBuffer(channelBuffer);
+ if (readBytes < 0) {
+ if (eofMessage != null) {
+ throw new EOFException(eofMessage);
+ } else {
+ if (totalReadBytes == 0) {
+ totalReadBytes = readBytes;
+ }
+ break;
+ }
+ }
+ if (totalReadBytes < 0) {
+ totalReadBytes = 0;
+ }
+ totalReadBytes += readBytes;
+ }
+ channelBuffer.flip();
+ assert (totalReadBytes == channelBuffer.limit()) || (totalReadBytes == -1
+ && channelBuffer.limit() == 0) : "readBytes=" + totalReadBytes
+ + " != limit=" + channelBuffer.limit();
+ if (totalReadBytes > initPosition) {
+ this.bytesRead += (totalReadBytes - initPosition);
+ }
+ return totalReadBytes;
+ }
+
+ protected void resetAndCopyLeftOverBytes(final ByteBuffer channelBuffer) {
+ if (channelBuffer.hasRemaining()) {
+ channelBuffer.compact();
+ } else {
+ channelBuffer.clear();
+ }
+ }
+
+ /**
+ * Fill the given buffer reading data from channel at least one byte unless
+ * end-of-stream has been reached.
+ *
+ * @return number of bytes read or -1 on end-of-stream
+ */
+ protected int readIntoBuffer(ByteBuffer buffer) throws IOException {
+ long parkedNanos = 0L;
+ int numTries = 0;
+ int numBytes;
+ while ((numBytes = this.channel.read(buffer)) == 0) {
+ if (!buffer.hasRemaining()) {
+ break;
+ }
+ // wait for a bit after some retries
+ parkedNanos = parkThreadForAsyncOperationIfRequired(
+ this, parkedNanos, ++numTries);
+ }
+ if (numBytes > 0) {
+ this.bytesRead += numBytes;
+ }
+ return numBytes;
+ }
+
+ @Override
+ public final Thread getParkedThread() {
+ return this.parkedThread;
+ }
+
+ @Override
+ public final void setParkedThread(Thread thread) {
+ this.parkedThread = thread;
+ }
+
+ @Override
+ public long getParkNanosMax() {
+ return PARK_NANOS_MAX;
+ }
+
+ /**
+ * Fill the given buffer reading data from channel whatever is available in
+ * one call (non-blocking if channel is so configured).
+ *
+ * @return number of bytes read (can be zero) or -1 on end-of-stream
+ */
+ protected int readIntoBufferNoWait(ByteBuffer buffer)
+ throws IOException {
+ final int numBytes = this.channel.read(buffer);
+ if (numBytes > 0) {
+ this.bytesRead += numBytes;
+ }
+ return numBytes;
+ }
+
+ public final long getBytesRead() {
+ return this.bytesRead;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/NativeCalls.java b/geode-core/src/main/java/org/apache/geode/internal/shared/NativeCalls.java
old mode 100755
new mode 100644
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java
new file mode 100644
index 0000000..2c58792
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Intermediate class that extends both an OutputStream and WritableByteChannel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class OutputStreamChannel extends OutputStream implements
+ WritableByteChannel, StreamChannel {
+
+ protected final WritableByteChannel channel;
+ private final boolean socketToSameHost;
+ private volatile Thread parkedThread;
+ protected volatile long bytesWritten;
+
+ protected OutputStreamChannel(WritableByteChannel channel) {
+ this.channel = channel;
+ this.socketToSameHost = isSocketToSameHost(channel);
+ }
+
+
+ private boolean isSocketToSameHost(Channel channel) {
+ try {
+ if (channel instanceof SocketChannel) {
+ SocketChannel socketChannel = (SocketChannel) channel;
+ return isSocketToSameHost(socketChannel.getLocalAddress(),
+ socketChannel.getRemoteAddress());
+ }
+ } catch (IOException ignored) {
+ }
+ return false;
+ }
+
+ private boolean isSocketToSameHost(SocketAddress localSockAddress,
+ SocketAddress remoteSockAddress) {
+ if ((localSockAddress instanceof InetSocketAddress) &&
+ (remoteSockAddress instanceof InetSocketAddress)) {
+ InetAddress localAddress = ((InetSocketAddress) localSockAddress)
+ .getAddress();
+ return localAddress != null && localAddress.equals(
+ ((InetSocketAddress) remoteSockAddress).getAddress());
+ } else {
+ return false;
+ }
+ }
+
+ public final boolean isSocketToSameHost() {
+ return this.socketToSameHost;
+ }
+
+ /**
+ * Get the underlying {@link WritableByteChannel}.
+ */
+ public final WritableByteChannel getUnderlyingChannel() {
+ return this.channel;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public abstract int write(ByteBuffer src) throws IOException;
+
+ /**
+ * Writes an <code>int</code> value, which is comprised of four bytes,
+ * to the output stream in big-endian format
+ * compatible with {@link java.io.DataOutput#writeInt(int)}.
+ *
+ * @param v the <code>int</code> value to be written.
+ * @throws IOException if an I/O error occurs.
+ * @see java.io.DataOutput#writeInt(int)
+ */
+ public abstract void writeInt(int v) throws IOException;
+
+ /**
+ * Common base method to write a given ByteBuffer source via an intermediate
+ * direct byte buffer owned by the implementation of this class (if required).
+ */
+ protected final int writeBuffered(final ByteBuffer src,
+ final ByteBuffer channelBuffer) throws IOException {
+ int srcLen = src.remaining();
+ // flush large direct buffers directly into channel
+ final boolean flushBuffer = srcLen > (channelBuffer.limit() >>> 1) &&
+ src.isDirect();
+ int numWritten = 0;
+ while (srcLen > 0) {
+ final int remaining = channelBuffer.remaining();
+ if (srcLen <= remaining) {
+ channelBuffer.put(src);
+ return (numWritten + srcLen);
+ } else {
+ // flush directly if there is nothing in the channel buffer
+ if (flushBuffer && channelBuffer.position() == 0) {
+ return numWritten + writeBufferNoWait(src, this.channel);
+ }
+ // copy src to buffer and flush
+ if (remaining > 0) {
+ // lower limit of src temporarily to remaining
+ final int srcPos = src.position();
+ src.limit(srcPos + remaining);
+ try {
+ channelBuffer.put(src);
+ } finally {
+ // restore the limit
+ src.limit(srcPos + srcLen);
+ }
+ srcLen -= remaining;
+ numWritten += remaining;
+ assert srcLen == src.remaining() : "srcLen=" + srcLen
+ + " srcRemaining=" + src.remaining();
+ }
+ // if we were able to write the full buffer then try writing the
+ // remaining from source else return with whatever was written
+ if (!flushBufferNonBlockingBase(channelBuffer)) {
+ return numWritten;
+ } else if (flushBuffer) {
+ return numWritten + writeBufferNoWait(src, this.channel);
+ }
+ // for non-direct buffers use channel buffer for best performance
+ // so loop back and try again
+ }
+ }
+ return numWritten;
+ }
+
+ protected final boolean flushBufferNonBlockingBase(final ByteBuffer buffer)
+ throws IOException {
+ buffer.flip();
+
+ final boolean flushed;
+ try {
+ writeBufferNoWait(buffer, this.channel);
+ } finally {
+ // if we failed to write the full buffer then compact the remaining bytes
+ // to the start so we can start filling it again
+ if (buffer.hasRemaining()) {
+ buffer.compact();
+ flushed = false;
+ } else {
+ buffer.clear();
+ flushed = true;
+ }
+ }
+ return flushed;
+ }
+
+ protected int writeBuffer(final ByteBuffer buffer,
+ final WritableByteChannel channel) throws IOException {
+ long parkedNanos = 0;
+ int numTries = 0;
+ int numWritten;
+ while ((numWritten = channel.write(buffer)) == 0) {
+ if (!buffer.hasRemaining()) {
+ break;
+ }
+ // wait for a bit after some retries
+ parkedNanos = InputStreamChannel.parkThreadForAsyncOperationIfRequired(
+ this, parkedNanos, ++numTries);
+ }
+ if (numWritten > 0) {
+ this.bytesWritten += numWritten;
+ }
+ return numWritten;
+ }
+
+ @Override
+ public final Thread getParkedThread() {
+ return this.parkedThread;
+ }
+
+ @Override
+ public final void setParkedThread(Thread thread) {
+ this.parkedThread = thread;
+ }
+
+ @Override
+ public long getParkNanosMax() {
+ return InputStreamChannel.PARK_NANOS_MAX;
+ }
+
+ protected int writeBufferNoWait(final ByteBuffer buffer,
+ final WritableByteChannel channel) throws IOException {
+ int numWritten = channel.write(buffer);
+ if (numWritten > 0) {
+ this.bytesWritten += numWritten;
+ }
+ return numWritten;
+ }
+
+ public final long getBytesWritten() {
+ return this.bytesWritten;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java
new file mode 100644
index 0000000..fd3fc02
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.Closeable;
+import java.nio.channels.Channel;
+
+/**
+ * Common base methods for {@link Channel} read/write operations (usually buffered).
+ */
+public interface StreamChannel extends Channel, Closeable {
+
+ /**
+ * Return the thread if waiting for read/write on this channel.
+ */
+ Thread getParkedThread();
+
+ /**
+ * Set the thread waiting for read/write on this channel (null to clear).
+ */
+ void setParkedThread(Thread thread);
+
+ /**
+ * Maximum time to wait before giving up read/write on channel
+ */
+ long getParkNanosMax();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java
new file mode 100644
index 0000000..d5727e6
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Base class for use by buffered abstractions over channel using direct byte
+ * buffers. This particular class can be used as a much more efficient
+ * replacement for BufferedInputStream.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferInputStream extends InputStreamChannel {
+
+ protected final ByteBuffer buffer;
+
+ public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ public ChannelBufferInputStream(ReadableByteChannel channel) throws IOException {
+ this(channel, DEFAULT_BUFFER_SIZE);
+ }
+
+ public ChannelBufferInputStream(ReadableByteChannel channel, int bufferSize)
+ throws IOException {
+ super(channel);
+ if (bufferSize <= 0) {
+ throw new IllegalArgumentException("invalid bufferSize=" + bufferSize);
+ }
+ this.buffer = allocateBuffer(bufferSize);
+ // flip to force refill on first use
+ this.buffer.flip();
+ }
+
+ protected ByteBuffer allocateBuffer(int bufferSize) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read() throws IOException {
+ if (this.buffer.hasRemaining()) {
+ return (this.buffer.get() & 0xff);
+ } else if (refillBuffer(this.buffer, 1, null) > 0) {
+ return (this.buffer.get() & 0xff);
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read(byte[] buf, int off, int len) throws IOException {
+ if (len == 1) {
+ if (this.buffer.hasRemaining()) {
+ buf[off] = this.buffer.get();
+ return 1;
+ } else if (refillBuffer(this.buffer, 1, null) > 0) {
+ buf[off] = this.buffer.get();
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ // first copy anything remaining from buffer
+ final int remaining = this.buffer.remaining();
+ if (len <= remaining) {
+ if (len > 0) {
+ this.buffer.get(buf, off, len);
+ return len;
+ } else {
+ return 0;
+ }
+ }
+
+ // refill buffer once and read whatever available into buf;
+ // caller should invoke in a loop if buffer is still not full
+ int readBytes = 0;
+ if (remaining > 0) {
+ this.buffer.get(buf, off, remaining);
+ off += remaining;
+ len -= remaining;
+ readBytes += remaining;
+ }
+ final int bufBytes = refillBuffer(this.buffer, 1, null);
+ if (bufBytes > 0) {
+ if (len > bufBytes) {
+ len = bufBytes;
+ }
+ this.buffer.get(buf, off, len);
+ return (readBytes + len);
+ } else {
+ return readBytes > 0 ? readBytes : bufBytes;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read(ByteBuffer dst) throws IOException {
+ return super.readBuffered(dst, this.buffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int readInt() throws IOException {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer.remaining() >= 4) {
+ return buffer.getInt();
+ } else {
+ refillBuffer(buffer, 4, "readInt: premature end of stream");
+ return buffer.getInt();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int available() throws IOException {
+ return this.buffer.remaining();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public final boolean isOpen() {
+ return this.channel.isOpen();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ this.buffer.clear();
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java
new file mode 100644
index 0000000..d5f187b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A buffered DataInput abstraction over channel using direct byte buffers, and
+ * using internal Unsafe class for best performance.
+ * <p>
+ * The implementation is not thread-safe by design. This particular class can be
+ * used as an efficient, buffered DataInput implementation for file channels,
+ * socket channels and other similar.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferUnsafeDataInputStream extends
+ ChannelBufferUnsafeInputStream implements DataInput {
+ private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+ public ChannelBufferUnsafeDataInputStream(ReadableByteChannel channel,
+ int bufferSize) {
+ super(channel, bufferSize);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void readFully(byte[] b,
+ int off, int len) throws IOException {
+ while (true) {
+ final int readBytes = super.read(b, off, len);
+ if (readBytes >= len) {
+ return;
+ } else if (readBytes >= 0) {
+ len -= readBytes;
+ off += readBytes;
+ } else {
+ throw new EOFException();
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int skipBytes(int n) {
+ return (int) skip(n);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long skip(long n) {
+ n = Math.max(0, Math.min(n, this.addrLimit - this.addrPosition));
+ this.addrPosition += n;
+ return n;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean readBoolean() throws IOException {
+ return readByte() != 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final byte readByte() throws IOException {
+ if (this.addrPosition >= this.addrLimit) {
+ refillBuffer(this.buffer, 1, "readByte: premature end of stream");
+ }
+ return unsafe.getByte(null, this.addrPosition++);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int readUnsignedByte() throws IOException {
+ return (readByte() & 0xff);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final short readShort() throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 2) {
+ refillBuffer(this.buffer, 2, "readShort: premature end of stream");
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition += 2;
+ if (UnsafeHolder.littleEndian) {
+ return Short.reverseBytes(unsafe.getShort(null, addrPos));
+ } else {
+ return unsafe.getShort(null, addrPos);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int readUnsignedShort() throws IOException {
+ return (readShort() & 0xFFFF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final char readChar() throws IOException {
+ return (char) readShort();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final long readLong() throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 8) {
+ refillBuffer(this.buffer, 8, "readLong: premature end of stream");
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition += 8;
+ if (UnsafeHolder.littleEndian) {
+ return Long.reverseBytes(unsafe.getLong(null, addrPos));
+ } else {
+ return unsafe.getLong(null, addrPos);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String readLine() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String readUTF() throws IOException {
+ return DataInputStream.readUTF(this);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java
new file mode 100644
index 0000000..01d89b8
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+
+/**
+ * A buffered DataOutput abstraction over channel using direct byte buffers, and
+ * using internal Unsafe class for best performance. Users must check for
+ * {@link UnsafeHolder#hasUnsafe()} before trying to use this class.
+ * <p>
+ * The implementation is not thread-safe by design. This particular class can be
+ * used as an efficient, buffered DataOutput implementation for file channels,
+ * socket channels and other similar.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferUnsafeDataOutputStream extends
+ ChannelBufferUnsafeOutputStream implements DataOutput {
+ private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+ public ChannelBufferUnsafeDataOutputStream(WritableByteChannel channel) {
+ super(channel);
+ }
+
+ public ChannelBufferUnsafeDataOutputStream(WritableByteChannel channel,
+ int bufferSize) {
+ super(channel, bufferSize);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeBoolean(boolean v) throws IOException {
+ putByte(v ? (byte) 1 : 0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeByte(int v) throws IOException {
+ putByte((byte) (v & 0xff));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeShort(int v) throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 2) {
+ flushBufferBlocking(this.buffer);
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition = putShort(addrPos, v);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeChar(int v) throws IOException {
+ writeShort(v);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeLong(long v) throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 8) {
+ flushBufferBlocking(this.buffer);
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition = putLong(addrPos, v);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeBytes(String s) throws IOException {
+ if (s.length() > 0) {
+ write(s.getBytes(StandardCharsets.US_ASCII));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeChars(String s) throws IOException {
+ int off = 0;
+ int len = s.length();
+ while (len > 0) {
+ long addrPos = this.addrPosition;
+ final int remaining = (int) (this.addrLimit - addrPos);
+ if ((len << 1) <= remaining) {
+ final int end = (off + len);
+ while (off < end) {
+ addrPos = putShort(addrPos, s.charAt(off++));
+ }
+ this.addrPosition = addrPos;
+ return;
+ } else {
+ final int remchars = (remaining >>> 1);
+ final int end = (off + remchars);
+ while (off < end) {
+ addrPos = putShort(addrPos, s.charAt(off++));
+ }
+ this.addrPosition = addrPos;
+ flushBufferBlocking(this.buffer);
+ len -= remchars;
+ }
+ }
+ }
+
+ private int getUTFLength(final String str, final int strLen) {
+ int utfLen = strLen;
+ for (int i = 0; i < strLen; i++) {
+ final char c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ // 1 byte for character
+ continue;
+ } else if (c > 0x07FF) {
+ utfLen += 2; // 3 bytes for character
+ } else {
+ utfLen++; // 2 bytes for character
+ }
+ }
+ return utfLen;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeUTF(String str) throws IOException {
+ int strLen = str.length();
+ if (strLen > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + strLen);
+ }
+
+ // first check the optimistic case where worst case of 2 for length + 3 for
+ // each char fits into remaining space in buffer
+ long addrPos = this.addrPosition;
+ long remaining = this.addrLimit - addrPos;
+ if (remaining >= ((strLen * 3) + 2)) {
+ // write the UTF string skipping the length, then write length at the last
+ addrPos += 2;
+ final long finalAddrPos = writeUTFSegmentNoOverflow(str, 0, strLen,
+ -1, null, addrPos);
+ long utflen = (finalAddrPos - addrPos);
+ if (utflen <= 65535) {
+ putShort(addrPos - 2, (int) utflen);
+ this.addrPosition = finalAddrPos;
+ } else {
+ // act as if we wrote nothing to this buffer (no change to addrPosition)
+ throw new UTFDataFormatException("encoded string too long: " + utflen
+ + " bytes");
+ }
+ return;
+ }
+
+
+ // otherwise first calculate the UTF encoded length, write it in buffer
+ // (which may need to be flushed at any point), then break string into worst
+ // case segments for writing to buffer and flushing if end of buffer reached
+ int utfLen = getUTFLength(str, strLen);
+ if (utfLen > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + utfLen
+ + " bytes");
+ }
+ // write the length first
+ if (remaining > 2) {
+ addrPos = putShort(addrPos, utfLen);
+ remaining -= 2;
+ } else {
+ flushBufferBlocking(this.buffer);
+ addrPos = putShort(this.addrPosition, utfLen);
+ remaining = this.addrLimit - addrPos;
+ }
+
+ // next break string into segments assuming worst case of 3 bytes per char,
+ // flushing buffer as required after each segment write
+ int offset = 0;
+ while (strLen > 0) {
+ int writeLen = Math.min(strLen, (int) (remaining / 3));
+ if (writeLen >= 3) {
+ // write the UTF segment and update the number of remaining characters,
+ // offset, remaining buffer size etc
+ long newAddrPos = writeUTFSegmentNoOverflow(str, offset, writeLen,
+ -1, null, addrPos);
+ strLen -= writeLen;
+ offset += writeLen;
+ remaining -= (newAddrPos - addrPos);
+ addrPos = newAddrPos;
+ } else {
+ // if we have too few to write then better to flush the buffer and then
+ // try (bufferSize is at least 10 as ensured in constructors)
+ this.addrPosition = addrPos;
+ flushBufferBlocking(this.buffer);
+ remaining = this.addrLimit - (addrPos = this.addrPosition);
+ }
+ }
+ this.addrPosition = addrPos;
+ }
+
+ public static long writeUTFSegmentNoOverflow(String str, int offset,
+ int length, final int utfLen, final Object target, long addrPos) {
+ final int end = (offset + length);
+ // fast path for ASCII strings
+ if (length == utfLen) {
+ while (offset < end) {
+ final char c = str.charAt(offset++);
+ unsafe.putByte(target, addrPos++, (byte) c);
+ }
+ return addrPos;
+ }
+ while (offset < end) {
+ final char c = str.charAt(offset++);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ unsafe.putByte(target, addrPos++, (byte) c);
+ } else if (c > 0x07FF) {
+ unsafe.putByte(target, addrPos++, (byte) (0xE0 | ((c >> 12) & 0x0F)));
+ unsafe.putByte(target, addrPos++, (byte) (0x80 | ((c >> 6) & 0x3F)));
+ unsafe.putByte(target, addrPos++, (byte) (0x80 | (c & 0x3F)));
+ } else {
+ unsafe.putByte(target, addrPos++, (byte) (0xC0 | ((c >> 6) & 0x1F)));
+ unsafe.putByte(target, addrPos++, (byte) (0x80 | (c & 0x3F)));
+ }
+ }
+ return addrPos;
+ }
+
+ /** Write a short in big-endian format on given off-heap address. */
+ protected static long putShort(long addrPos, final int v) {
+ if (UnsafeHolder.littleEndian) {
+ unsafe.putShort(null, addrPos, Short.reverseBytes((short) v));
+ } else {
+ unsafe.putShort(null, addrPos, (short) v);
+ }
+ return addrPos + 2;
+ }
+
+ /** Write a long in big-endian format on given off-heap address. */
+ protected static long putLong(long addrPos, final long v) {
+ if (UnsafeHolder.littleEndian) {
+ unsafe.putLong(null, addrPos, Long.reverseBytes(v));
+ } else {
+ unsafe.putLong(null, addrPos, v);
+ }
+ return addrPos + 8;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java
new file mode 100644
index 0000000..9be7956
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data unsafe.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.geode.internal.shared.ChannelBufferInputStream;
+import org.apache.geode.internal.shared.InputStreamChannel;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A more efficient implementation of {@link ChannelBufferInputStream}
+ * using internal unsafe class (~30% in raw read calls).
+ * Use {@link UnsafeHolder#newChannelBufferInputStream} method to
+ * create either this or {@link ChannelBufferInputStream} depending on
+ * availability.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferUnsafeInputStream extends InputStreamChannel {
+ private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+ protected ByteBuffer buffer;
+ protected final long baseAddress;
+ /**
+ * Actual buffer position (+baseAddress) accounting is done by this. Buffer
+ * position is adjusted during refill and other places where required using
+ * this.
+ */
+ protected long addrPosition;
+ protected long addrLimit;
+
+ public ChannelBufferUnsafeInputStream(ReadableByteChannel channel) {
+ this(channel, ChannelBufferInputStream.DEFAULT_BUFFER_SIZE);
+ }
+
+ public ChannelBufferUnsafeInputStream(ReadableByteChannel channel,
+ int bufferSize) {
+ super(channel);
+ if (bufferSize <= 0) {
+ throw new IllegalArgumentException("invalid bufferSize=" + bufferSize);
+ }
+ this.buffer = allocateBuffer(bufferSize);
+ // force refill on first use
+ this.buffer.position(bufferSize);
+
+ try {
+ this.baseAddress = UnsafeHolder.getDirectBufferAddress(this.buffer);
+ resetBufferPositions();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "failed in creating an 'unsafe' buffered channel stream", e);
+ }
+ }
+
+ protected final void resetBufferPositions() {
+ this.addrPosition = this.baseAddress + this.buffer.position();
+ this.addrLimit = this.baseAddress + this.buffer.limit();
+ }
+
+ protected ByteBuffer allocateBuffer(int bufferSize) {
+ // use allocator which will restrict total allocated size
+ ByteBuffer buffer = DirectBufferAllocator.instance().allocateWithFallback(
+ bufferSize, "CHANNELINPUT");
+ // set the order to native explicitly to skip any byte order conversions
+ buffer.order(ByteOrder.nativeOrder());
+ return buffer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read() throws IOException {
+ if (this.addrPosition >= this.addrLimit) {
+ if (refillBuffer(this.buffer, 1, null) <= 0) {
+ return -1;
+ }
+ }
+ return unsafe.getByte(null, this.addrPosition++) & 0xff;
+ }
+
+ private int read_(byte[] buf, int off, int len) throws IOException {
+ if (len == 1) {
+ final int b = read();
+ if (b != -1) {
+ buf[off] = (byte) b;
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ // first copy anything remaining from buffer
+ final int remaining = (int) (this.addrLimit - this.addrPosition);
+ if (len <= remaining) {
+ if (len > 0) {
+ unsafe.copyMemory(null, this.addrPosition, buf,
+ unsafe.arrayBaseOffset(byte[].class) + off, len);
+ this.addrPosition += len;
+ return len;
+ } else {
+ return 0;
+ }
+ }
+
+ // refill buffer once and read whatever available into buf;
+ // caller should invoke in a loop if buffer is still not full
+ if (remaining > 0) {
+ unsafe.copyMemory(null, this.addrPosition, buf,
+ unsafe.arrayBaseOffset(byte[].class) + off, remaining);
+ this.addrPosition += remaining;
+ return remaining;
+ }
+ final int bufBytes = refillBuffer(this.buffer, 1, null);
+ if (bufBytes > 0) {
+ if (len > bufBytes) {
+ len = bufBytes;
+ }
+ unsafe.copyMemory(null, this.addrPosition, buf,
+ unsafe.arrayBaseOffset(byte[].class) + off, len);
+ this.addrPosition += len;
+ return len;
+ } else {
+ return bufBytes;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read(byte[] buf) throws IOException {
+ return read_(buf, 0, buf.length);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read(byte[] buf,
+ int off, int len) throws IOException {
+ UnsafeHolder.checkBounds(buf.length, off, len);
+ return read_(buf, off, len);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int read(ByteBuffer dst) throws IOException {
+ // We will just use our ByteBuffer for the read. It might be possible
+ // to get slight performance advantage in using unsafe instead, but
+ // copying to ByteBuffer will not be efficient without reflection
+ // to get dst's native address in case it is a direct byte buffer.
+ // Avoiding the complication since the benefit will be very small
+ // in any case (and reflection cost may well offset that).
+ // We can use unsafe for a small perf benefit for heap byte buffers.
+
+ // adjust this buffer position first
+ this.buffer.position((int) (this.addrPosition - this.baseAddress));
+ try {
+ // now we are set to just call base class method
+ return super.readBuffered(dst, this.buffer);
+ } finally {
+ // finally reset the raw positions from buffer
+ resetBufferPositions();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int readInt() throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 4) {
+ refillBuffer(this.buffer, 4, "readInt: premature end of stream");
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition += 4;
+ if (UnsafeHolder.littleEndian) {
+ return Integer.reverseBytes(unsafe.getInt(null, addrPos));
+ } else {
+ return unsafe.getInt(null, addrPos);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int available() {
+ return (int) (this.addrLimit - this.addrPosition);
+ }
+
+ @Override
+ protected int refillBuffer(final ByteBuffer channelBuffer,
+ final int tryReadBytes, final String eofMessage) throws IOException {
+ // adjust this buffer position first
+ channelBuffer.position((int) (this.addrPosition - this.baseAddress));
+ try {
+ return super.refillBuffer(channelBuffer, tryReadBytes, eofMessage);
+ } finally {
+ // adjust back position and limit
+ resetBufferPositions();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean isOpen() {
+ return this.channel.isOpen();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null) {
+ this.addrPosition = this.addrLimit = 0;
+ this.buffer = null;
+ DirectBufferAllocator.instance().release(buffer);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java
new file mode 100644
index 0000000..a923d08
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.geode.internal.shared.ChannelBufferOutputStream;
+import org.apache.geode.internal.shared.OutputStreamChannel;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A somewhat more efficient implementation of {@link ChannelBufferOutputStream}
+ * using internal unsafe class (~30% in raw single byte write calls).
+ * Use {@link UnsafeHolder#newChannelBufferOutputStream} method to create
+ * either this or {@link ChannelBufferOutputStream} depending on availability.
+ * <p>
+ * NOTE: THIS CLASS IS NOT THREAD-SAFE BY DESIGN. IF IT IS USED CONCURRENTLY
+ * BY MULTIPLE THREADS THEN BAD THINGS CAN HAPPEN DUE TO UNSAFE MEMORY WRITES.
+ * <p>
+ * Note that the close() method of this class does not close the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferUnsafeOutputStream extends OutputStreamChannel {
+ private static UnsafeWrapper unsafe = new UnsafeWrapper();
+ protected ByteBuffer buffer;
+ protected final long baseAddress;
+ /**
+ * Actual buffer position (+baseAddress) accounting is done by this. Buffer
+ * position is adjusted during refill and other places where required using
+ * this.
+ */
+ protected long addrPosition;
+ protected long addrLimit;
+
+ /**
+ * Some minimum buffer size, particularly for longs and encoding UTF strings
+ * efficiently. If reducing this, then consider the logic in
+ * {@link ChannelBufferUnsafeDataOutputStream#writeUTF(String)} carefully.
+ */
+ protected static final int MIN_BUFFER_SIZE = 32;
+
+ public ChannelBufferUnsafeOutputStream(WritableByteChannel channel) {
+ this(channel, ChannelBufferOutputStream.DEFAULT_BUFFER_SIZE);
+ }
+
+ public ChannelBufferUnsafeOutputStream(WritableByteChannel channel,
+ int bufferSize) {
+ super(channel);
+ this.baseAddress = allocateBuffer(bufferSize);
+ resetBufferPositions();
+ }
+
+ /**
+ * Get handle to the underlying ByteBuffer. ONLY TO BE USED BY TESTS.
+ */
+ public ByteBuffer getInternalBuffer() {
+ // set the current position
+ this.buffer.position(position());
+ return this.buffer;
+ }
+
+ protected final void resetBufferPositions() {
+ this.addrPosition = this.baseAddress + this.buffer.position();
+ this.addrLimit = this.baseAddress + this.buffer.limit();
+ }
+
+ protected long allocateBuffer(int bufferSize) {
+ // expect minimum bufferSize of 10 bytes
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ throw new IllegalArgumentException(
+ "ChannelBufferUnsafeDataOutputStream: buffersize=" + bufferSize
+ + " too small (minimum " + MIN_BUFFER_SIZE + ')');
+ }
+ // use allocator which will restrict total allocated size
+ final ByteBuffer buffer = DirectBufferAllocator.instance().allocateWithFallback(
+ bufferSize, "CHANNELOUTPUT");
+ // set the order to native explicitly to skip any byte order conversions
+ buffer.order(ByteOrder.nativeOrder());
+ this.buffer = buffer;
+
+ try {
+ return UnsafeHolder.getDirectBufferAddress(buffer);
+ } catch (Exception e) {
+ releaseBuffer();
+ throw new RuntimeException(
+ "failed in creating an 'unsafe' buffered channel stream", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void write(int b) throws IOException {
+ putByte((byte) (b & 0xff));
+ }
+
+ protected final void write_(byte[] b, int off, int len) throws IOException {
+ if (len == 1) {
+ putByte(b[off]);
+ return;
+ }
+
+ while (len > 0) {
+ final long addrPos = this.addrPosition;
+ final int remaining = (int) (this.addrLimit - addrPos);
+ if (len <= remaining) {
+ unsafe.copyMemory(b, unsafe.arrayBaseOffset(byte[].class) + off,
+ null, addrPos, len);
+ this.addrPosition += len;
+ return;
+ } else {
+ // copy b to buffer and flush
+ if (remaining > 0) {
+ unsafe.copyMemory(b, unsafe.arrayBaseOffset(byte[].class) + off,
+ null, addrPos, remaining);
+ this.addrPosition += remaining;
+ len -= remaining;
+ off += remaining;
+ }
+ flushBufferBlocking(this.buffer);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void write(byte[] b) throws IOException {
+ write_(b, 0, b.length);
+ }
+
+ protected final void putByte(byte b) throws IOException {
+ if (this.addrPosition >= this.addrLimit) {
+ flushBufferBlocking(this.buffer);
+ }
+ unsafe.putByte(null, this.addrPosition++, b);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void write(byte[] b,
+ int off, int len) throws IOException {
+ UnsafeHolder.checkBounds(b.length, off, len);
+ write_(b, off, len);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final int write(ByteBuffer src) throws IOException {
+ // We will just use our ByteBuffer for the write. It might be possible
+ // to get slight performance advantage in using unsafe instead, but
+ // copying from source ByteBuffer will not be efficient without
+ // reflection to get src's native address in case it is a direct
+ // byte buffer. Avoiding the complication since the benefit will be
+ // very small in any case (and reflection cost may well offset that).
+
+ // adjust this buffer position first
+ this.buffer.position((int) (this.addrPosition - this.baseAddress));
+ // now we are actually set to just call base class method
+ try {
+ return super.writeBuffered(src, this.buffer);
+ } finally {
+ // finally reset the raw positions from buffer
+ resetBufferPositions();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void writeInt(int v) throws IOException {
+ long addrPos = this.addrPosition;
+ if ((this.addrLimit - addrPos) < 4) {
+ flushBufferBlocking(this.buffer);
+ addrPos = this.addrPosition;
+ }
+ this.addrPosition = putInt(addrPos, v);
+ }
+
+ public final int position() {
+ return (int) (this.addrPosition - this.baseAddress);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() throws IOException {
+ final ByteBuffer buffer;
+ if (this.addrPosition > this.baseAddress &&
+ (buffer = this.buffer) != null) {
+ flushBufferBlocking(buffer);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean isOpen() {
+ return this.channel.isOpen();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ flush();
+ this.addrPosition = this.addrLimit = 0;
+ releaseBuffer();
+ }
+
+ protected final void releaseBuffer() {
+ final ByteBuffer buffer = this.buffer;
+ if (buffer != null) {
+ this.buffer = null;
+ DirectBufferAllocator.instance().release(buffer);
+ }
+ }
+
+ /**
+ * Close the underlying channel in addition to flushing/clearing the buffer.
+ */
+ public void closeChannel() throws IOException {
+ flush();
+ this.addrPosition = this.addrLimit = 0;
+ this.channel.close();
+ releaseBuffer();
+ }
+
+ protected void flushBufferBlocking(final ByteBuffer buffer)
+ throws IOException {
+ buffer.position(position());
+ buffer.flip();
+ try {
+ do {
+ writeBuffer(buffer, this.channel);
+ } while (buffer.hasRemaining());
+ } finally {
+ if (buffer.hasRemaining()) {
+ buffer.compact();
+ } else {
+ buffer.clear();
+ }
+ resetBufferPositions();
+ }
+ }
+
+ /** Write an integer in big-endian format on given off-heap address. */
+ protected static long putInt(long addrPos, final int v) {
+ if (UnsafeHolder.littleEndian) {
+ unsafe.putInt(null, addrPos, Integer.reverseBytes(v));
+ } else {
+ unsafe.putInt(null, addrPos, v);
+ }
+ return addrPos + 4;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java
new file mode 100644
index 0000000..d589cde
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
+
+import org.apache.geode.internal.shared.BufferAllocator;
+
+/**
+ * Generic implementation of {@link BufferAllocator} for direct ByteBuffers
+ * using Java NIO API.
+ */
+public class DirectBufferAllocator extends BufferAllocator {
+
+ /**
+ * Overhead of allocation on off-heap memory is kept fixed at 8 even though
+ * actual overhead will be dependent on the malloc implementation.
+ */
+ public static final int DIRECT_OBJECT_OVERHEAD = 8;
+
+ /**
+ * The owner of direct buffers that are stored in Regions and tracked in UMM.
+ */
+ public static final String DIRECT_STORE_OBJECT_OWNER =
+ "GEODE_DIRECT_STORE_OBJECTS";
+
+ public static final String DIRECT_STORE_DATA_FRAME_OUTPUT =
+ "DIRECT_" + STORE_DATA_FRAME_OUTPUT;
+
+ private static final DirectBufferAllocator globalInstance =
+ new DirectBufferAllocator();
+
+ private static volatile DirectBufferAllocator instance = globalInstance;
+
+ public static DirectBufferAllocator instance() {
+ return instance;
+ }
+
+ public DirectBufferAllocator initialize() {
+ DirectBufferAllocator.setInstance(this);
+ return this;
+ }
+
+ public static synchronized void setInstance(DirectBufferAllocator allocator) {
+ instance = allocator;
+ }
+
+ public static synchronized void resetInstance() {
+ instance = globalInstance;
+ }
+
+ protected DirectBufferAllocator() {}
+
+ public RuntimeException lowMemoryException(String op, int required) {
+ return new RuntimeException();
+ }
+
+ public void changeOwnerToStorage(ByteBuffer buffer, int capacity,
+ BiConsumer<String, Object> changeOwner) {}
+
+ @Override
+ public ByteBuffer allocate(int size, String owner) {
+ return allocateForStorage(size);
+ }
+
+ @Override
+ public ByteBuffer allocateWithFallback(int size, String owner) {
+ try {
+ return allocateForStorage(size);
+ } catch (RuntimeException re) {
+ if (instance() != globalInstance) {
+ return globalInstance.allocateForStorage(size);
+ } else {
+ throw re;
+ }
+ }
+ }
+
+ @Override
+ public ByteBuffer allocateForStorage(int size) {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+ return buffer;
+ }
+
+ @Override
+ public void clearPostAllocate(ByteBuffer buffer) {
+ // clear till the capacity and not limit since former will be a factor
+ // of 8 and hence more efficient in Unsafe.setMemory
+ fill(buffer, (byte) 0, 0, buffer.capacity());
+ }
+
+ @Override
+ public Object baseObject(ByteBuffer buffer) {
+ return null;
+ }
+
+ @Override
+ public long baseOffset(ByteBuffer buffer) {
+ return UnsafeHolder.getDirectBufferAddress(buffer);
+ }
+
+ @Override
+ public ByteBuffer expand(ByteBuffer buffer, int required, String owner) {
+ assert required > 0 : "expand: unexpected required = " + required;
+
+ final int currentUsed = buffer.limit();
+ if (currentUsed + required > buffer.capacity()) {
+ final int newLength = BufferAllocator.expandedSize(currentUsed, required);
+ final ByteBuffer newBuffer = ByteBuffer.allocateDirect(newLength)
+ .order(buffer.order());
+ buffer.rewind();
+ newBuffer.put(buffer);
+ UnsafeHolder.releaseDirectBuffer(buffer);
+ newBuffer.rewind(); // position at start as per the contract of expand
+ return newBuffer;
+ } else {
+ buffer.limit(currentUsed + required);
+ return buffer;
+ }
+ }
+
+ @Override
+ public ByteBuffer fromBytesToStorage(byte[] bytes, int offset, int length) {
+ final ByteBuffer buffer = allocateForStorage(length);
+ buffer.put(bytes, offset, length);
+ // move to the start
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Override
+ public ByteBuffer transfer(ByteBuffer buffer, String owner) {
+ if (buffer.isDirect()) {
+ return buffer;
+ } else {
+ return super.transfer(buffer, owner);
+ }
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ UnsafeHolder.releasePendingReferences();
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java
new file mode 100644
index 0000000..686af0f
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.shared.unsafe;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+@SuppressWarnings("serial")
+public abstract class FreeMemory extends AtomicLong implements Runnable {
+
+ protected FreeMemory(long address) {
+ super(address);
+ }
+
+ protected final long tryFree() {
+ // try hard to ensure freeMemory call happens only once
+ final long address = get();
+ return (address != 0 && compareAndSet(address, 0L)) ? address : 0L;
+ }
+
+ protected abstract String objectName();
+
+ @Override
+ public void run() {
+ final long address = tryFree();
+ if (address != 0) {
+ new UnsafeWrapper().freeMemory(address);
+ }
+ }
+
+ public interface Factory {
+ FreeMemory newFreeMemory(long address, int size);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java
new file mode 100644
index 0000000..dfa5a81
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.geode.internal.shared.ChannelBufferInputStream;
+import org.apache.geode.internal.shared.ChannelBufferOutputStream;
+import org.apache.geode.internal.shared.InputStreamChannel;
+import org.apache.geode.internal.shared.OutputStreamChannel;
+
+/**
+ * Holder for static sun.misc.Unsafe instance and some convenience methods. Use
+ * other methods only if {@link UnsafeHolder#hasUnsafe()} returns true;
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class UnsafeHolder {
+
+ private static final class Wrapper {
+
+ static final sun.misc.Unsafe unsafe;
+ static final boolean unaligned;
+ static final Constructor<?> directBufferConstructor;
+ static final Field cleanerField;
+ static final Field cleanerRunnableField;
+ static final Object javaLangRefAccess;
+ static final Method handlePendingRefs;
+
+ static {
+ sun.misc.Unsafe v;
+ Constructor<?> dbConstructor;
+ Field cleaner;
+ Field runnableField = null;
+ try {
+ final ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
+ // try using "theUnsafe" field
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ v = (sun.misc.Unsafe) field.get(null);
+
+ // get the constructor of DirectByteBuffer that accepts a Runnable
+ Class<?> cls = Class.forName("java.nio.DirectByteBuffer",
+ false, systemLoader);
+ dbConstructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
+ dbConstructor.setAccessible(true);
+
+ cleaner = cls.getDeclaredField("cleaner");
+ cleaner.setAccessible(true);
+
+ // search for the Runnable field in Cleaner
+ Class<?> runnableClass = Runnable.class;
+ Field[] fields = sun.misc.Cleaner.class.getDeclaredFields();
+ for (Field f : fields) {
+ if (runnableClass.isAssignableFrom(f.getType())) {
+ if (runnableField == null || f.getName().contains("thunk")) {
+ f.setAccessible(true);
+ runnableField = f;
+ }
+ }
+ }
+
+ Class<?> bitsClass = Class.forName("java.nio.Bits",
+ false, systemLoader);
+ Method m = bitsClass.getDeclaredMethod("unaligned");
+ m.setAccessible(true);
+ unaligned = Boolean.TRUE.equals(m.invoke(null));
+
+ } catch (LinkageError le) {
+ throw le;
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ if (v == null) {
+ throw new ExceptionInInitializerError("theUnsafe not found");
+ }
+ if (runnableField == null) {
+ throw new ExceptionInInitializerError(
+ "DirectByteBuffer cleaner thunk runnable field not found");
+ }
+ unsafe = v;
+ directBufferConstructor = dbConstructor;
+ cleanerField = cleaner;
+ cleanerRunnableField = runnableField;
+
+ Method m;
+ Object langRefAccess;
+ try {
+ m = sun.misc.SharedSecrets.class.getMethod("getJavaLangRefAccess");
+ m.setAccessible(true);
+ langRefAccess = m.invoke(null);
+ m = langRefAccess.getClass().getMethod("tryHandlePendingReference");
+ m.setAccessible(true);
+ m.invoke(langRefAccess);
+ } catch (Throwable ignored) {
+ langRefAccess = null;
+ m = null;
+ }
+ javaLangRefAccess = langRefAccess;
+ handlePendingRefs = m;
+ }
+
+ static void init() {}
+ }
+
+ private static final boolean hasUnsafe;
+ // Limit to the chunk copied per Unsafe.copyMemory call to allow for
+ // safepoint polling by JVM.
+ public static final boolean littleEndian =
+ ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+ static {
+ boolean v;
+ try {
+ Wrapper.init();
+ v = true;
+ } catch (LinkageError le) {
+ le.printStackTrace();
+ v = false;
+ }
+ hasUnsafe = v;
+ }
+
+ private UnsafeHolder() {
+ // no instance
+ }
+
+ public static boolean hasUnsafe() {
+ return hasUnsafe;
+ }
+
+ public static int getAllocationSize(int size) {
+ // round to word size
+ size = ((size + 7) >>> 3) << 3;
+ if (size > 0)
+ return size;
+ else
+ throw new BufferOverflowException();
+ }
+
+ public static ByteBuffer allocateDirectBuffer(int size,
+ FreeMemory.Factory factory) {
+ final int allocSize = getAllocationSize(size);
+ final ByteBuffer buffer = allocateDirectBuffer(
+ getUnsafe().allocateMemory(allocSize), allocSize, factory);
+ buffer.limit(size);
+ return buffer;
+ }
+
+ public static ByteBuffer allocateDirectBuffer(long address, int size,
+ FreeMemory.Factory factory) {
+ try {
+ ByteBuffer buffer = (ByteBuffer) Wrapper.directBufferConstructor
+ .newInstance(address, size);
+ if (factory != null) {
+ sun.misc.Cleaner cleaner = sun.misc.Cleaner.create(buffer,
+ factory.newFreeMemory(address, size));
+ Wrapper.cleanerField.set(buffer, cleaner);
+ }
+ return buffer;
+ } catch (Exception e) {
+ getUnsafe().throwException(e);
+ throw new IllegalStateException("unreachable");
+ }
+ }
+
+ public static long getDirectBufferAddress(ByteBuffer buffer) {
+ return ((sun.nio.ch.DirectBuffer) buffer).address();
+ }
+
+ // public static ByteBuffer reallocateDirectBuffer(ByteBuffer buffer,
+ // int newSize, Class<?> expectedClass, FreeMemory.Factory factory) {
+ // sun.nio.ch.DirectBuffer directBuffer = (sun.nio.ch.DirectBuffer)buffer;
+ // final long address = directBuffer.address();
+ // long newAddress = 0L;
+ //
+ // newSize = getAllocationSize(newSize);
+ // final sun.misc.Cleaner cleaner = directBuffer.cleaner();
+ // if (cleaner != null) {
+ // // reset the runnable to not free the memory and clean it up
+ // try {
+ // Object freeMemory = Wrapper.cleanerRunnableField.get(cleaner);
+ // if (expectedClass != null && (freeMemory == null ||
+ // !expectedClass.isInstance(freeMemory))) {
+ // throw new IllegalStateException("Expected class to be " +
+ // expectedClass.getName() + " in reallocate but was " +
+ // (freeMemory != null ? freeMemory.getClass().getName() : "null"));
+ // }
+ // // use the efficient realloc call if possible
+ // if ((freeMemory instanceof FreeMemory) &&
+ // ((FreeMemory)freeMemory).tryFree() != 0L) {
+ // newAddress = Wrapper.unsafe.reallocateMemory(address, newSize);
+ // }
+ // } catch (IllegalAccessException e) {
+ // // fallback to full copy
+ // }
+ // }
+ // if (newAddress == 0L) {
+ // if (expectedClass != null) {
+ // throw new IllegalStateException("Expected class to be " +
+ // expectedClass.getName() + " in reallocate but was non-runnable");
+ // }
+ // newAddress = getUnsafe().allocateMemory(newSize);
+ // Platform.copyMemory(null, address, null, newAddress,
+ // Math.min(newSize, buffer.limit()));
+ // }
+ // // clean only after copying is done
+ // if (cleaner != null) {
+ // cleaner.clean();
+ // cleaner.clear();
+ // }
+ // return allocateDirectBuffer(newAddress, newSize, factory)
+ // .order(buffer.order());
+ // }
+
+ // /**
+ // * Change the runnable field of Cleaner using given factory. The "to"
+ // * argument specifies that target Runnable type that factory will produce.
+ // * If the existing Runnable already matches "to" then its a no-op.
+ // * <p>
+ // * The provided {@link BiConsumer} is used to apply any action before actually
+ // * changing the runnable field with the boolean argument indicating whether
+ // * the current field matches "from" or if it is something else.
+ // */
+ // public static void changeDirectBufferCleaner(
+ // ByteBuffer buffer, int size, Class<? extends FreeMemory> from,
+ // Class<? extends FreeMemory> to, FreeMemory.Factory factory,
+ // final BiConsumer<String, Object> changeOwner) throws IllegalAccessException {
+ // sun.nio.ch.DirectBuffer directBuffer = (sun.nio.ch.DirectBuffer)buffer;
+ // final sun.misc.Cleaner cleaner = directBuffer.cleaner();
+ // if (cleaner != null) {
+ // // change the runnable
+ // final Field runnableField = Wrapper.cleanerRunnableField;
+ // Object runnable = runnableField.get(cleaner);
+ // // skip if it already matches the target Runnable type
+ // if (!to.isInstance(runnable)) {
+ // if (changeOwner != null) {
+ // if (from.isInstance(runnable)) {
+ // changeOwner.accept(((FreeMemory)runnable).objectName(), runnable);
+ // } else {
+ // changeOwner.accept(null, runnable);
+ // }
+ // }
+ // Runnable newFree = factory.newFreeMemory(directBuffer.address(), size);
+ // runnableField.set(cleaner, newFree);
+ // }
+ // } else {
+ // throw new IllegalAccessException(
+ // "ByteBuffer without a Cleaner cannot be marked for storage");
+ // }
+ // }
+
+ /**
+ * Release explicitly assuming passed ByteBuffer is a direct one. Avoid using
+ * this directly rather use BufferAllocator.allocate/release where possible.
+ */
+ public static void releaseDirectBuffer(ByteBuffer buffer) {
+ sun.misc.Cleaner cleaner = ((sun.nio.ch.DirectBuffer) buffer).cleaner();
+ if (cleaner != null) {
+ cleaner.clean();
+ cleaner.clear();
+ }
+ buffer.rewind().limit(0);
+ }
+
+ public static void releasePendingReferences() {
+ // commented code intended to be invoked by reflection for platforms
+ // that may not have the requisite classes (e.g. Mac default JDK)
+ /*
+ * final sun.misc.JavaLangRefAccess refAccess =
+ * sun.misc.SharedSecrets.getJavaLangRefAccess();
+ * while (refAccess.tryHandlePendingReference()) ;
+ */
+ final Method handlePendingRefs = Wrapper.handlePendingRefs;
+ if (handlePendingRefs != null) {
+ try {
+ // retry while helping enqueue pending Cleaner Reference objects
+ // noinspection StatementWithEmptyBody
+ while ((Boolean) handlePendingRefs.invoke(Wrapper.javaLangRefAccess));
+ } catch (Exception ignored) {
+ // ignore any exceptions in releasing pending references
+ }
+ }
+ }
+
+ public static sun.misc.Unsafe getUnsafe() {
+ return Wrapper.unsafe;
+ }
+
+ public static boolean tryMonitorEnter(Object obj, boolean checkSelf) {
+ if (checkSelf && Thread.holdsLock(obj)) {
+ return false;
+ } else if (!getUnsafe().tryMonitorEnter(obj)) {
+ // try once more after a small wait
+ LockSupport.parkNanos(100L);
+ if (!getUnsafe().tryMonitorEnter(obj)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static void monitorEnter(Object obj) {
+ getUnsafe().monitorEnter(obj);
+ }
+
+ public static void monitorExit(Object obj) {
+ getUnsafe().monitorExit(obj);
+ }
+
+ @SuppressWarnings("resource")
+ public static InputStreamChannel newChannelBufferInputStream(
+ ReadableByteChannel channel, int bufferSize) throws IOException {
+ return (hasUnsafe
+ ? new ChannelBufferUnsafeInputStream(channel, bufferSize)
+ : new ChannelBufferInputStream(channel, bufferSize));
+ }
+
+ @SuppressWarnings("resource")
+ public static OutputStreamChannel newChannelBufferOutputStream(
+ WritableByteChannel channel, int bufferSize) throws IOException {
+ return (hasUnsafe
+ ? new ChannelBufferUnsafeOutputStream(channel, bufferSize)
+ : new ChannelBufferOutputStream(channel, bufferSize));
+ }
+
+ // @SuppressWarnings("resource")
+ // public static InputStreamChannel newChannelBufferFramedInputStream(
+ // ReadableByteChannel channel, int bufferSize) throws IOException {
+ // return (hasUnsafe
+ // ? new ChannelBufferUnsafeFramedInputStream(channel, bufferSize)
+ // : new ChannelBufferFramedInputStream(channel, bufferSize));
+ // }
+ //
+ // @SuppressWarnings("resource")
+ // public static OutputStreamChannel newChannelBufferFramedOutputStream(
+ // WritableByteChannel channel, int bufferSize) throws IOException {
+ // return (hasUnsafe
+ // ? new ChannelBufferUnsafeFramedOutputStream(channel, bufferSize)
+ // : new ChannelBufferFramedOutputStream(channel, bufferSize));
+ // }
+
+ /**
+ * Checks that the range described by {@code offset} and {@code size}
+ * doesn't exceed {@code arrayLength}.
+ */
+ public static void checkBounds(int arrayLength, int offset, int len) {
+ if ((offset | len) < 0 || offset > arrayLength ||
+ arrayLength - offset < len) {
+ throw new ArrayIndexOutOfBoundsException("Array index out of range: " +
+ "length=" + arrayLength + " offset=" + offset + " length=" + len);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
index 550def8..24e426a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
@@ -53,4 +53,10 @@ public interface BaseMsgStreamer {
* @throws IOException on exception
*/
void close() throws IOException;
+
+
+ /**
+ * Called to free up resources used by this streamer after the streamer has produced its message.
+ */
+ void release();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
index abb7fdb..5c4024c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
@@ -27,7 +27,8 @@ public class Buffers {
/**
* A list of soft references to byte buffers.
*/
- private static final ConcurrentLinkedQueue bufferQueue = new ConcurrentLinkedQueue();
+ private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+ new ConcurrentLinkedQueue<>();
/**
* Should only be called by threads that have currently acquired send permission.
@@ -47,14 +48,14 @@ public class Buffers {
if (TCPConduit.useDirectBuffers) {
IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
// set
- BBSoftReference ref = (BBSoftReference) bufferQueue.poll();
+ BBSoftReference ref = bufferQueue.poll();
while (ref != null) {
ByteBuffer bb = ref.getBB();
if (bb == null) {
// it was garbage collected
int refSize = ref.consumeSize();
if (refSize > 0) {
- if (ref.getSend()) { // fix bug 46773
+ if (ref.getSend()) {
stats.incSenderBufferSize(-refSize, true);
} else {
stats.incReceiverBufferSize(-refSize, true);
@@ -68,7 +69,7 @@ public class Buffers {
// wasn't big enough so put it back in the queue
Assert.assertTrue(bufferQueue.offer(ref));
if (alreadySeen == null) {
- alreadySeen = new IdentityHashMap<BBSoftReference, BBSoftReference>();
+ alreadySeen = new IdentityHashMap<>();
}
if (alreadySeen.put(ref, ref) != null) {
// if it returns non-null then we have already seen this item
@@ -77,7 +78,7 @@ public class Buffers {
break;
}
}
- ref = (BBSoftReference) bufferQueue.poll();
+ ref = bufferQueue.poll();
}
result = ByteBuffer.allocateDirect(size);
} else {
@@ -116,14 +117,14 @@ public class Buffers {
}
}
- public static void initBufferStats(DMStats stats) { // fixes 46773
+ public static void initBufferStats(DMStats stats) {
if (TCPConduit.useDirectBuffers) {
@SuppressWarnings("unchecked")
- Iterator<BBSoftReference> it = (Iterator<BBSoftReference>) bufferQueue.iterator();
+ Iterator<BBSoftReference> it = bufferQueue.iterator();
while (it.hasNext()) {
BBSoftReference ref = it.next();
if (ref.getBB() != null) {
- if (ref.getSend()) { // fix bug 46773
+ if (ref.getSend()) {
stats.incSenderBufferSize(ref.getSize(), true);
} else {
stats.incReceiverBufferSize(ref.getSize(), true);
@@ -163,7 +164,7 @@ public class Buffers {
}
public ByteBuffer getBB() {
- return (ByteBuffer) super.get();
+ return super.get();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
old mode 100755
new mode 100644
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f234ee7..7c2c7a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -20,6 +20,7 @@ import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -31,6 +32,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
@@ -44,10 +46,13 @@ import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
+import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.distributed.DistributedMember;
@@ -81,6 +86,7 @@ import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.tcp.MsgReader.Header;
import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
/**
* Connection is a socket holder that sends and receives serialized message objects. A Connection
@@ -107,6 +113,12 @@ public class Connection implements Runnable {
public static final int MSG_HEADER_BYTES = 7;
/**
+ * The default wait to use when waiting to read/write a channel
+ * (when there is no selector to signal)
+ */
+ public static final long PARK_NANOS_FOR_READ_WRITE = 100L;
+
+ /**
* Small buffer used for send socket buffer on receiver connections and receive buffer on sender
* connections.
*/
@@ -162,6 +174,10 @@ public class Connection implements Runnable {
}
}
+ public boolean isPooled() {
+ return this.pooled;
+ }
+
private int getP2PConnectTimeout() {
if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
return P2P_CONNECT_TIMEOUT;
@@ -211,8 +227,23 @@ public class Connection implements Runnable {
/** the non-NIO output stream */
OutputStream output;
+ /**
+ * The NIO stream message stream reader for readAck. The "processNIOStream"
+ * thread uses its own local inputStream for the case of readAcks which is
+ * spawned just for doing the handshake and then the thread terminates
+ * while readAck will wait for that initialization to happen (so there
+ * is no race in reading from channel between the two threads).
+ */
+ private MsgChannelDestreamer ackInputStream;
+
+ /** Set to true when in message dispatch processing. */
+ private boolean inDispatch;
+
+ /** Set to true when this connection is a pooled one. */
+ private final boolean pooled;
+
/** output stream/channel lock */
- private final Object outLock = new Object();
+ private final StoppableReentrantLock outLock;
/** the ID string of the conduit (for logging) */
String conduitIdStr;
@@ -395,10 +426,10 @@ public class Connection implements Runnable {
boolean preserveOrder = false;
/** number of messages sent on this connection */
- private long messagesSent;
+ private AtomicLong messagesSent = new AtomicLong();
/** number of messages received on this connection */
- private long messagesReceived;
+ private AtomicLong messagesReceived = new AtomicLong();
/** unique ID of this connection (remote if isReceiver==true) */
private volatile long uniqueId;
@@ -536,7 +567,9 @@ public class Connection implements Runnable {
}
this.conduit = t.getConduit();
this.isReceiver = true;
+ this.pooled = false;
this.owner = t;
+ this.outLock = new StoppableReentrantLock(t.getConduit().getCancelCriterion());
this.socket = socket;
this.conduitIdStr = owner.getConduit().getSocketId().toString();
this.handshakeRead = false;
@@ -665,7 +698,7 @@ public class Connection implements Runnable {
throw new IOException(
String.format(
"Detected wrong version of GemFire product during handshake. Expected %s but found %s",
- new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
+ HANDSHAKE_VERSION, ver));
}
return ver;
}
@@ -886,6 +919,12 @@ public class Connection implements Runnable {
InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+
+ // NIO stream expects network big-endian byte order
+ if (useNIOStream()) {
+ connectHandshake.buffer.order(ByteOrder.BIG_ENDIAN);
+ }
+
/*
* Note a byte of zero is always written because old products serialized a member id with always
* sends an ip address. My reading of the ip-address specs indicated that the first byte of a
@@ -917,7 +956,7 @@ public class Connection implements Runnable {
nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
}
- private void handshakeStream() throws IOException {
+ private void handshakeOldIO() throws IOException {
waitForAddressCompletion();
this.output = getSocket().getOutputStream();
@@ -962,7 +1001,7 @@ public class Connection implements Runnable {
if (useNIO()) {
handshakeNio();
} else {
- handshakeStream();
+ handshakeOldIO();
}
startReader(connTable); // this reader only reads the handshake and then exits
@@ -979,7 +1018,7 @@ public class Connection implements Runnable {
*/
protected static Connection createSender(final MembershipManager mgr, final ConnectionTable t,
final boolean preserveOrder, final DistributedMember remoteAddr, final boolean sharedResource,
- final long startTime, final long ackTimeout, final long ackSATimeout)
+ final boolean pooled, final long startTime, final long ackTimeout, final long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
boolean warningPrinted = false;
boolean success = false;
@@ -1063,7 +1102,7 @@ public class Connection implements Runnable {
// create connection
try {
conn = null;
- conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
+ conn = new Connection(t, preserveOrder, remoteAddr, sharedResource, pooled);
} catch (javax.net.ssl.SSLHandshakeException se) {
// no need to retry if certificates were rejected
throw se;
@@ -1183,7 +1222,8 @@ public class Connection implements Runnable {
* must accept us We will almost always send messages; small acks are received.
*/
private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
- boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
+ boolean sharedResource, boolean pooled)
+ throws IOException, DistributedSystemDisconnectedException {
// initialize a socket upfront. So that the
InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
@@ -1194,7 +1234,9 @@ public class Connection implements Runnable {
this.conduit = t.getConduit();
this.isReceiver = false;
this.owner = t;
+ this.outLock = new StoppableReentrantLock(t.getConduit().getCancelCriterion());
this.sharedResource = sharedResource;
+ this.pooled = pooled;
this.preserveOrder = preserveOrder;
setRemoteAddr(remoteAddr);
this.conduitIdStr = this.owner.getConduit().getSocketId().toString();
@@ -1208,7 +1250,60 @@ public class Connection implements Runnable {
InetSocketAddress addr =
new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
- if (useNIO()) {
+ if (useNIOStream()) {
+ SocketChannel channel = SocketChannel.open();
+ final Socket socket = channel.socket();
+ this.owner.addConnectingSocket(socket, addr.getAddress());
+ final int connectTime = getP2PConnectTimeout();
+ try {
+ setSendBufferSize(socket);
+ // disable Nagle since we are already buffering and a flush
+ // at this layer will always be followed by a read (either on this
+ // channel or some other with ReplyProcessor21)
+ socket.setTcpNoDelay(true);
+ // Non-blocking mode to write only as much as possible in one round.
+ // MsgChannelStreamer will move to writing to other servers,
+ // if remaining, for best performance.
+ if (pooled) {
+ channel.configureBlocking(false);
+ channel.connect(addr);
+ final long connectTimeNanos = connectTime * 1000000L;
+ long start = 0L;
+ while (!channel.finishConnect()) {
+ if (start == 0L && connectTimeNanos > 0) {
+ start = System.nanoTime();
+ }
+ LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+ if (connectTimeNanos > 0 &&
+ (System.nanoTime() - start) > connectTimeNanos) {
+ throw new ConnectException(
+ "Attempt to connect timed out after " + connectTime + "ms");
+ }
+ }
+ } else {
+ // configure as blocking channel for non-pooled connections
+ channel.configureBlocking(true);
+ socket.connect(addr, connectTime);
+ }
+ } catch (NullPointerException | IllegalStateException e) {
+ // bug #44469: for some reason NIO throws runtime exceptions
+ // instead of an IOException on timeouts
+ ConnectException c =
+ new ConnectException("Attempt to connect timed out after " + connectTime + "ms");
+ c.initCause(e);
+ // prevent a hot loop by sleeping a little bit
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ throw c;
+ } finally {
+ this.owner.removeConnectingSocket(socket);
+ }
+ this.socket = socket;
+
+ } else if (useNIO()) {
SocketChannel channel = SocketChannel.open();
this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
try {
@@ -1672,7 +1767,9 @@ public class Connection implements Runnable {
ConnectionTable.threadWantsSharedResources();
makeReaderThread(this.isReceiver);
try {
- if (useNIO()) {
+ if (useNIOStream()) {
+ processNIOStream();
+ } else if (useNIO()) {
runNioReader();
} else {
runOioReader();
@@ -1932,6 +2029,10 @@ public class Connection implements Runnable {
private void closeAllMsgDestreamers() {
synchronized (this.destreamerLock) {
+ final MsgChannelDestreamer inputStream = this.ackInputStream;
+ if (inputStream != null) {
+ inputStream.close();
+ }
if (this.idleMsgDestreamer != null) {
this.idleMsgDestreamer.close();
this.idleMsgDestreamer = null;
@@ -2315,7 +2416,7 @@ public class Connection implements Runnable {
}
// } else {
// ConnectionTable.threadWantsSharedResources();
- // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
+ // logger.debug("thread-owned receiver with domino count of " + dominoNumber + "
// will prefer shared sockets");
}
this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
@@ -2528,7 +2629,7 @@ public class Connection implements Runnable {
}
}
if (cacheContentChanges) {
- messagesSent++;
+ messagesSent.incrementAndGet();
}
} finally {
accessed();
@@ -3281,6 +3382,10 @@ public class Connection implements Runnable {
} finally {
stats.endSocketWrite(true, start, amtWritten, 0);
// this.writerThread = null;
+ if (amtWritten == 0) {
+ // wait for a bit before retrying
+ LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+ }
}
} while (buffer.remaining() > 0);
} // synchronized
@@ -3305,10 +3410,10 @@ public class Connection implements Runnable {
/**
* stateLock is used to synchronize state changes.
*/
- private final Object stateLock = new Object();
+ final Object stateLock = new Object();
/** for timeout processing, this is the current state of the connection */
- private byte connectionState = STATE_IDLE;
+ byte connectionState = STATE_IDLE;
/* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
/** the connection is idle, but may be in use */
@@ -3349,8 +3454,22 @@ public class Connection implements Runnable {
MsgReader msgReader = null;
DMStats stats = owner.getConduit().getStats();
final Version version = getRemoteVersion();
+ CancelCriterion cancelCriterion = owner.getConduit().getCancelCriterion();
try {
- if (useNIO()) {
+ if (useNIOStream()) {
+ // use the normal message reader with fast dispatch route
+ synchronized (this.destreamerLock) {
+ // create a shared MsgChannelDestreamer on first use
+ MsgChannelDestreamer inputStream = this.ackInputStream;
+ if (inputStream == null) {
+ inputStream = new MsgChannelDestreamer(this,
+ getSocket().getChannel(), owner.getConduit().tcpBufferSize);
+ this.ackInputStream = inputStream;
+ }
+ readAndDispatchMessage(inputStream, processor, stats, cancelCriterion);
+ return;
+ }
+ } else if (useNIO()) {
msgReader = new NIOMsgReader(this, version);
} else {
msgReader = new OioMsgReader(this, version);
@@ -3374,20 +3493,7 @@ public class Connection implements Runnable {
releaseMsgDestreamer(header.getNioMessageId(), destreamer);
len = destreamer.size();
}
- // I'd really just like to call dispatchMessage here. However,
- // that call goes through a bunch of checks that knock about
- // 10% of the performance. Since this direct-ack stuff is all
- // about performance, we'll skip those checks. Skipping them
- // should be legit, because we just sent a message so we know
- // the member is already in our view, etc.
- ClusterDistributionManager dm = (ClusterDistributionManager) owner.getDM();
- msg.setBytesRead(len);
- msg.setSender(remoteAddr);
- stats.incReceivedMessages(1L);
- stats.incReceivedBytes(msg.getBytesRead());
- stats.incMessageChannelTime(msg.resetTimestamp());
- msg.process(dm, processor);
- // dispatchMessage(msg, len, false);
+ fastDispatchMessage(msg, len, processor, stats);
} catch (MemberShunnedException e) {
// do nothing
} catch (SocketTimeoutException timeout) {
@@ -3407,10 +3513,10 @@ public class Connection implements Runnable {
throw new ConnectionException(
String.format("Unable to read direct ack because: %s", e));
} catch (ConnectionException e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ cancelCriterion.checkCancelInProgress(e);
throw e;
} catch (Exception e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ cancelCriterion.checkCancelInProgress(e);
if (!isSocketClosed()) {
logger.fatal("ack read exception", e);
}
@@ -3438,6 +3544,19 @@ public class Connection implements Runnable {
}
}
+ private void fastDispatchMessage(final ReplyMessage msg,
+ final int bytesRead, final DirectReplyProcessor processor,
+ final DMStats stats) {
+ DistributionManager dm = owner.getDM();
+ msg.setBytesRead(bytesRead);
+ msg.setSender(remoteAddr);
+ stats.incReceivedMessages(1L);
+ stats.incReceivedBytes(bytesRead);
+ stats.incMessageChannelTime(msg.resetTimestamp());
+ msg.process(dm, processor);
+ }
+
+
/**
* processes the current NIO buffer. If there are complete messages in the buffer, they are
* deserialized and passed to TCPConduit for further processing
@@ -3517,29 +3636,15 @@ public class Connection implements Runnable {
throw td;
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
SystemFailure.checkFailure();
logger.fatal("Throwable dispatching message", t);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
SystemFailure.checkFailure();
sendFailureReply(ReplyProcessor21.getMessageRPId(),
"Error deserializing message", t,
@@ -3602,15 +3707,8 @@ public class Connection implements Runnable {
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
throw err;
} catch (Throwable ex) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
SystemFailure.checkFailure();
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
@@ -3641,15 +3739,8 @@ public class Connection implements Runnable {
throw td;
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
SystemFailure.checkFailure();
logger.fatal("Throwable dispatching message", t);
}
@@ -3690,15 +3781,8 @@ public class Connection implements Runnable {
throw td;
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
SystemFailure.checkFailure();
logger.fatal("Throwable deserializing P2P handshake reply",
t);
@@ -3780,10 +3864,6 @@ public class Connection implements Runnable {
// buffer.
setSendBufferSize(this.socket);
}
- // String name = owner.getDM().getConfig().getName();
- // if (name == null) {
- // name = "pid="+OSProcess.getId();
- // }
setThreadName(dominoNumber);
} catch (Exception e) {
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
@@ -3853,6 +3933,514 @@ public class Connection implements Runnable {
}
}
+ private void processNIOStream() {
+ // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
+ SocketChannel channel;
+ try {
+ Socket socket = getSocket();
+ channel = socket.getChannel();
+ socket.setTcpNoDelay(true);
+ channel.configureBlocking(true);
+ } catch (ClosedChannelException e) {
+ // bug 37693: the channel was asynchronously closed. Our work
+ // is done.
+ try {
+ requestClose("channel closing: " + e);
+ } catch (Exception ignore) {
+ }
+ return; // exit loop and thread
+ } catch (IOException ex) {
+ if (stopped || owner.getConduit().getCancelCriterion()
+ .cancelInProgress() != null) {
+ try {
+ requestClose("shutting down");
+ } catch (Exception ignore) {
+ }
+ return; // bug37520: exit loop (and thread)
+ }
+ logger.warn("failed to set channel to blocking mode: " + ex);
+ this.readerShuttingDown = true;
+ try {
+ requestClose("failed to set channel to blocking mode: " + ex);
+ } catch (Exception ignore) {
+ }
+ return;
+ }
+
+ if (!stopped) {
+ logger.debug("Starting " + p2pReaderName());
+ }
+ // we should not change the state of the connection if we are a handshake
+ // reader thread as there is a race between this thread and the application
+ // thread doing direct ack
+ // fix for #40869
+ boolean isHandShakeReader = false;
+ MsgChannelDestreamer inputStream = null;
+ try {
+ inputStream = new MsgChannelDestreamer(
+ this, channel, this.owner.getConduit().tcpBufferSize);
+ for (;;) {
+ if (stopped) {
+ break;
+ }
+ if (SystemFailure.getFailure() != null) {
+ // Allocate no objects here!
+ Socket s = this.socket;
+ if (s != null) {
+ try {
+ s.close();
+ } catch (IOException e) {
+ // don't care
+ }
+ }
+ SystemFailure.checkFailure(); // throws
+ }
+ if (this.owner.getConduit().getCancelCriterion()
+ .cancelInProgress() != null) {
+ break;
+ }
+
+ synchronized (stateLock) {
+ connectionState = Connection.STATE_IDLE;
+ }
+ try {
+ // no locking required here since only one thread will ever
+ // do processing (for readAck case this thread will terminate
+ // after handshake since "isReceiver" will be false)
+ processMessage(inputStream);
+ if (!this.isReceiver
+ && (this.handshakeRead || this.handshakeCancelled)) {
+ if (logger.isDebugEnabled()) {
+ if (this.handshakeRead) {
+ logger.debug(p2pReaderName() +
+ " handshake has been read " + this);
+ } else {
+ logger.debug(p2pReaderName() +
+ " handshake has been cancelled " + this);
+ }
+ }
+ isHandShakeReader = true;
+ // Once we have read the handshake the reader can go away
+ break;
+ }
+ } catch (CancelException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(p2pReaderName() + " Terminated <" + this
+ + "> due to cancellation:", e);
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose("cache closed in channel read: " + e);
+ } catch (Exception ignored) {
+ }
+ return;
+ } catch (ClosedChannelException | EOFException e) {
+ this.readerShuttingDown = true;
+ try {
+ requestClose("channel closing: " + e);
+ } catch (Exception ignored) {
+ }
+ return;
+ } catch (IOException e) {
+ if (!isSocketClosed()
+ // needed for Solaris jdk 1.4.2_08
+ && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
+ if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+ logger.debug(p2pReaderName() + " io exception for " + this, e);
+ }
+ if (e.getMessage().contains(
+ "interrupted by a call to WSACancelBlockingCall")) {
+ logger.warn(p2pReaderName() +
+ " received unexpected WSACancelBlockingCall exception, " +
+ "which may result in a hang - bug 45675");
+ }
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose("I/O exception in channel read: " + e);
+ } catch (Exception ignored) {
+ }
+ return;
+
+ } catch (Throwable t) {
+ Error err;
+ if (t instanceof Error && SystemFailure.isJVMFailureError(
+ err = (Error) t)) {
+ SystemFailure.initiateFailure(err);
+ throw err;
+ }
+ SystemFailure.checkFailure();
+
+ // bug 37101
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ if (!stopped && !isSocketClosed()) {
+ logger.warn("Exception in channel read", t);
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose("exception in channel read: " + t);
+ } catch (Exception ignored) {
+ }
+ return;
+ }
+ } // for
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+
+ if (!isHandShakeReader) {
+ synchronized (stateLock) {
+ connectionState = STATE_IDLE;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(p2pReaderName() + " runNioReader terminated "
+ + " id=" + conduitIdStr + " from " + remoteAddr);
+ }
+ }
+ }
+
+ private boolean validateMessageType(byte messageType) {
+ if (validMsgType(messageType)) {
+ return true;
+ } else {
+ logger.fatal("Unknown P2P message type: " + messageType, new Exception("stack trace"));
+ this.readerShuttingDown = true;
+ requestClose("Unknown P2P message type: " + messageType);
+ return false;
+ }
+ }
+
+ private boolean processHandshake(MsgChannelDestreamer input)
+ throws IOException {
+ // read the header in handshake
+ int messageLength = input.readInt();
+ calcHdrVersion(messageLength);
+ byte messageType = input.readByte();
+ // skip message ID
+ input.skipBytes(2);
+ directAck = (messageType & DIRECT_ACK_BIT) != 0;
+ if (directAck) {
+ // logger.info("DEBUG: msg from " + getRemoteAddress() + " is direct ack" );
+ messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+ }
+ // Following validation fixes bug 31145
+ if (!validateMessageType(messageType)) {
+ return false;
+ }
+ if (!this.isReceiver) {
+ try {
+ this.replyCode = input.readUnsignedByte();
+ // logger.info("DEBUG: replyCode=" + this.replyCode);
+ if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
+ this.asyncDistributionTimeout = input.readInt();
+ this.asyncQueueTimeout = input.readInt();
+ this.asyncMaxQueueSize = (long) input.readInt() * (1024 * 1024);
+ if (this.asyncDistributionTimeout != 0 && logger.isInfoEnabled()) {
+ logger.info("{} async configuration received {}.",
+ p2pReaderName(),
+ " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+ + " asyncQueueTimeout=" + this.asyncQueueTimeout
+ + " asyncMaxQueueSize="
+ + (this.asyncMaxQueueSize / (1024 * 1024)));
+ }
+ // read the product version ordinal for on-the-fly serialization
+ // transformations (for rolling upgrades)
+ this.remoteVersion = Version.readVersion(input, true);
+ }
+ } catch (Exception e) {
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ logger.fatal("Error deserializing handshake reply", e);
+ this.readerShuttingDown = true;
+ requestClose("Errpr deserializing handshake reply");
+ return false;
+ } catch (ThreadDeath td) {
+ throw td;
+ } catch (Error err) {
+ if (SystemFailure.isJVMFailureError(err)) {
+ SystemFailure.initiateFailure(err);
+ throw err;
+ }
+ SystemFailure.checkFailure();
+ logger.fatal("Error deserializing handshake reply", err);
+ this.readerShuttingDown = true;
+ requestClose("Error deserializing handshake reply");
+ return false;
+ }
+ if (this.replyCode != REPLY_CODE_OK &&
+ this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
+ String err =
+ "Unknown handshake reply code: %s nioMessageLength: %s";
+ Object[] errArgs = new Object[] {Integer.valueOf(this.replyCode),
+ Integer.valueOf(nioMessageLength)};
+ if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+ logger.debug(
+ String.format(err, errArgs) + " (peer probably departed ungracefully)");
+ } else {
+ logger.fatal(err, errArgs);
+ }
+ this.readerShuttingDown = true;
+ requestClose(String.format(err, errArgs));
+ return false;
+ }
+ notifyHandshakeWaiter(true);
+ } else {
+ try {
+ byte b = input.readByte();
+ if (b != 0) {
+ throw new IllegalStateException(
+ String.format(
+ "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
+ new Byte(b)));
+ }
+ byte handshakeByte = input.readByte();
+ if (handshakeByte != HANDSHAKE_VERSION) {
+ throw new IllegalStateException(
+ String.format(
+ "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
+
+ new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
+ }
+ InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(input);
+ setRemoteAddr(remote);
+ this.sharedResource = input.readBoolean();
+ this.preserveOrder = input.readBoolean();
+ this.uniqueId = input.readLong();
+ // read the product version ordinal for on-the-fly serialization
+ // transformations (for rolling upgrades)
+ this.remoteVersion = Version.readVersion(input, true);
+ int dominoNumber = 0;
+ dominoNumber = input.readInt();
+ if (this.sharedResource) {
+ dominoNumber = 0;
+ }
+ dominoCount.set(dominoNumber);
+ if (!this.sharedResource) {
+ if (tipDomino()) {
+ logger.info(
+ "thread owned receiver forcing itself to send on thread owned sockets");
+ } else if (dominoNumber < 2) {
+ ConnectionTable.threadWantsOwnResources();
+ logger.debug("thread-owned receiver with domino count of " +
+ dominoNumber + " will prefer sending on thread-owned sockets");
+ } else {
+ ConnectionTable.threadWantsSharedResources();
+ logger.debug("thread-owned receiver with domino count of " +
+ dominoNumber + " will prefer shared sockets");
+ }
+ }
+ Thread.currentThread().setName("P2P message reader for " +
+ this.remoteAddr + " " + (this.sharedResource ? "" : "un") +
+ "shared " + (this.preserveOrder ? "" : "un") + "ordered uid=" +
+ uniqueId + (dominoNumber > 0 ? (" dom #" + dominoNumber) : ""));
+ } catch (Exception e) {
+ // bug 37101
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ logger.fatal("Error deserializing p2p handshake message", e);
+ this.readerShuttingDown = true;
+ requestClose("Error deserializing p2p handshake message");
+ return false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("P2P handshake remote address is " + this.remoteAddr +
+ (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+ }
+ try {
+ String authInit = System.getProperty(
+ DistributionConfigImpl.SECURITY_SYSTEM_PREFIX +
+ DistributionConfig.SECURITY_PEER_AUTH_INIT_NAME);
+ boolean isSecure = authInit != null && authInit.length() != 0;
+
+ if (isSecure) {
+ if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
+ sendOKHandshakeReply(); // fix for bug 33224
+ notifyHandshakeWaiter(true);
+ } else {
+ // ARB: check if we need notifyHandshakeWaiter() call.
+ notifyHandshakeWaiter(false);
+ logger.warn("Timeout during membership check in " + p2pReaderName());
+ return false;
+ }
+ } else {
+ sendOKHandshakeReply(); // fix for bug 33224
+ try {
+ notifyHandshakeWaiter(true);
+ } catch (Exception e) {
+ logger.warn("Uncaught exception from listener", e);
+ }
+ }
+ } catch (IOException ex) {
+ final String err = "Failed sending p2p handshake reply";
+ if (logger.isDebugEnabled()) {
+ logger.debug(err, ex);
+ }
+ this.readerShuttingDown = true;
+ requestClose(err + ": " + ex);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Reads the next message from the channel stream and passes it
+ * to TCPConduit for further processing.
+ * <p>
+ * The "inLock" must be held when invoking this method.
+ */
+ private void processMessage(final MsgChannelDestreamer input)
+ throws ConnectionException, IOException {
+ final TCPConduit conduit = this.owner.getConduit();
+ final CancelCriterion cancelCriterion = conduit.getCancelCriterion();
+ final DMStats stats = conduit.getStats();
+
+ if (!connected) {
+ return;
+ }
+ cancelCriterion.checkCancelInProgress(null);
+
+ if (!handshakeRead) {
+ // read HANDSHAKE with old header
+ processHandshake(input);
+ // continue with next message processing
+ return;
+ }
+
+ try {
+ readAndDispatchMessage(input, null, stats, cancelCriterion);
+ } catch (IOException | ConnectionException e) {
+ // throw back connection exceptions to be handled by caller
+ throw e;
+ } catch (Throwable t) {
+ Error err;
+ if (t instanceof Error && SystemFailure.isJVMFailureError(
+ err = (Error) t)) {
+ SystemFailure.initiateFailure(err);
+ throw err;
+ }
+ SystemFailure.checkFailure();
+
+ // #49309
+ if (!(t instanceof CancelException)) {
+ final String reason = cancelCriterion.cancelInProgress();
+ Throwable cancelledEx = null;
+ if (reason != null) {
+ cancelledEx = cancelCriterion.generateCancelledException(t);
+ if (cancelledEx != null) {
+ t = cancelledEx;
+ }
+ }
+ if (cancelledEx != null) {
+ t = cancelledEx;
+ } else {
+ // check if CancelException is wrapped inside
+ Throwable cause = t;
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof CancelException) {
+ t = cause;
+ break;
+ }
+ }
+ }
+ }
+
+ final String error = inDispatch ? "Uncaught exception while dispatching message"
+ : "Error deserializing message";
+ sendFailureReply(ReplyProcessor21.getMessageRPId(),
+ error, t, directAck);
+ if (t instanceof ThreadDeath) {
+ throw (ThreadDeath) t;
+ }
+ if (t instanceof CancelException) {
+ if (!inDispatch || !(t instanceof CacheClosedException)) {
+ // Just log a message if we had trouble processing
+ // due to CacheClosedException; see bug 43543
+ throw (CancelException) t;
+ }
+ } else if (!inDispatch) {
+ // connection must be closed if there was trouble during deserialization
+ // else there can be dangling data on the connection causing subsequent
+ // reads to fail and processing to hang on the sender side (SNAP-1488)
+ logger.fatal(error, t);
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ } else {
+ throw new SerializationException(t.getMessage(), t);
+ }
+ }
+ logger.fatal(error, t);
+ } finally {
+ ReplyProcessor21.clearMessageRPId();
+ }
+ }
+
+ private void readAndDispatchMessage(final MsgChannelDestreamer input,
+ final DirectReplyProcessor directProcessor, final DMStats stats,
+ final CancelCriterion cancelCriterion) throws ConnectionException,
+ IOException, ClassNotFoundException {
+
+ byte messageType = input.readByte();
+ directAck = (messageType & DIRECT_ACK_BIT) != 0;
+ if (directAck) {
+ messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+ }
+ // Following validation fixes bug 31145
+ if (directProcessor == null && !validateMessageType(messageType)) {
+ return;
+ }
+
+ // mark that a new message is started primarily for getting
+ // total message length and writing that to stats etc
+ input.startNewMessage();
+
+ input.setRemoteVersion(remoteVersion);
+ inDispatch = false;
+ DistributionMessage msg;
+
+ ReplyProcessor21.initMessageRPId();
+ // add serialization stats
+ long startSer = stats.startMsgDeserialization();
+ msg = (DistributionMessage) InternalDataSerializer.readDSFID(input);
+ stats.endMsgDeserialization(startSer);
+ /*
+ * (streaming model can read next message on channel immediately)
+ * final int remaining = input.available();
+ * if (remaining != 0) {
+ * logger.warning(LocalizedStrings
+ * .Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
+ * new Object[]{msg, remaining});
+ * // move buffer point to the end in any case else will be cascading errors
+ * input.skipBytes(remaining);
+ * }
+ */
+ final int messageLength = input.getLastMessageLength();
+ inDispatch = true;
+ if (directProcessor == null) {
+ try {
+ if (!dispatchMessage(msg, messageLength, directAck)) {
+ directAck = false;
+ }
+ accessed();
+ } catch (MemberShunnedException e) {
+ directAck = false; // don't respond (bug39117)
+ } catch (Exception de) {
+ cancelCriterion.checkCancelInProgress(de);
+ logger.fatal("Error dispatching message", de);
+ }
+ } else {
+ // pass through exceptions to caller
+ fastDispatchMessage((ReplyMessage) msg, messageLength,
+ directProcessor, stats);
+ }
+ }
+
+ public final void incMessagesReceived() {
+ this.messagesReceived.incrementAndGet();
+ }
+
private void setThreadName(int dominoNumber) {
Thread.currentThread().setName("P2P message reader for " + this.remoteAddr + " "
+ (this.sharedResource ? "" : "un") + "shared" + " " + (this.preserveOrder ? "" : "un")
@@ -3899,7 +4487,7 @@ public class Connection implements Runnable {
return true;
} finally {
if (msg.containsRegionContentChange()) {
- messagesReceived++;
+ messagesReceived.incrementAndGet();
}
}
}
@@ -3908,6 +4496,10 @@ public class Connection implements Runnable {
return this.conduit;
}
+ protected StoppableReentrantLock getOutLock() {
+ return outLock;
+ }
+
protected Socket getSocket() throws SocketException {
// fix for bug 37286
Socket result = this.socket;
@@ -3988,14 +4580,14 @@ public class Connection implements Runnable {
* answers the number of messages received by this connection
*/
protected long getMessagesReceived() {
- return messagesReceived;
+ return messagesReceived.get();
}
/**
* answers the number of messages sent on this connection
*/
protected long getMessagesSent() {
- return messagesSent;
+ return messagesSent.get();
}
public void acquireSendPermission() throws ConnectionException {
@@ -4008,23 +4600,28 @@ public class Connection implements Runnable {
return;
}
boolean interrupted = false;
- try {
- for (;;) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- try {
- this.senderSem.acquire();
- break;
- } catch (InterruptedException ex) {
- interrupted = true;
+ final boolean useNIOStream = useNIOStream();
+ if (!useNIOStream) {
+ try {
+ for (;;) {
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ try {
+ this.senderSem.acquire();
+ break;
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ } // for
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
- } // for
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
}
}
if (!this.connected) {
- this.senderSem.release();
+ if (!useNIOStream) {
+ this.senderSem.release();
+ }
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
throw new ConnectionException(
"connection is closed");
@@ -4032,7 +4629,7 @@ public class Connection implements Runnable {
}
public void releaseSendPermission() {
- if (isReaderThread()) {
+ if (isReaderThread() || getConduit().useNIOStream()) {
return;
}
this.senderSem.release();
@@ -4073,4 +4670,12 @@ public class Connection implements Runnable {
}
return this.useNIO;
}
+
+ boolean useNIOStream() {
+ return this.owner.getConduit().useNIOStream();
+ }
+
+ void incMessagesSent() {
+ messagesSent.incrementAndGet();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 8bf3b3a..2638b39 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -32,11 +32,15 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -58,7 +62,7 @@ import org.apache.geode.internal.net.SocketCloser;
* @since GemFire 2.1
*/
public class ConnectionTable {
- private static final Logger logger = LogService.getLogger();
+ static final Logger logger = LogService.getLogger();
/** warning when descriptor limit reached */
private static boolean ulimitWarningIssued;
@@ -72,7 +76,8 @@ public class ConnectionTable {
* Used for messages whose order must be preserved Only connections used for sending messages, and
* receiving acks, will be put in this map.
*/
- protected final Map orderedConnectionMap = new ConcurrentHashMap();
+ protected final ConcurrentMap<InternalDistributedMember, Object> orderedConnectionMap =
+ new ConcurrentHashMap<>();
/**
* ordered connections local to this thread. Note that accesses to the resulting map must be
@@ -100,13 +105,14 @@ public class ConnectionTable {
* threadOrderedConnMap. The value is an ArrayList since we can have any number of connections
* with the same key.
*/
- private ConcurrentMap threadConnectionMap;
+ private ConcurrentMap<InternalDistributedMember, List> threadConnectionMap;
/**
* Used for all non-ordered messages. Only connections used for sending messages, and receiving
* acks, will be put in this map.
*/
- protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+ protected final ConcurrentMap<InternalDistributedMember, Object> unorderedConnectionMap =
+ new ConcurrentHashMap();
/**
* Used for all accepted connections. These connections are read only; we never send messages,
@@ -144,13 +150,51 @@ public class ConnectionTable {
*
* TODO this assumes no more than one instance is created at a time?
*/
- private static final AtomicReference lastInstance = new AtomicReference();
+ private static final AtomicReference<ConnectionTable> lastInstance = new AtomicReference<>();
/**
* A set of sockets that are in the process of being connected
*/
private Map connectingSockets = new HashMap();
+ private static final class ConnKey {
+ final DistributedMember member;
+ final long startTime;
+ final long ackTimeout;
+ final long ackSATimeout;
+
+ ConnKey(DistributedMember member) {
+ this(member, 0, 0, 0);
+ }
+
+ ConnKey(DistributedMember member, long startTime, long ackTimeout,
+ long ackSATimeout) {
+ this.member = member;
+ this.startTime = startTime;
+ this.ackTimeout = ackTimeout;
+ this.ackSATimeout = ackSATimeout;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof ConnKey && this.member.equals(((ConnKey) obj).member);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.member.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return this.member.toString();
+ }
+ }
+
+ /** The connection pool used for sending messages keyed by members. */
+ private final QueryKeyedObjectPool<ConnKey, Connection> connectionPool;
+
+
/**
* Cause calling thread to share communication resources with other threads.
*/
@@ -192,11 +236,69 @@ public class ConnectionTable {
? new SystemTimer(conduit.getDM().getSystem(), true) : null;
this.threadOrderedConnMap = new ThreadLocal();
this.threadConnMaps = new ArrayList();
- this.threadConnectionMap = new ConcurrentHashMap();
this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
this.socketCloser = new SocketCloser();
+
+ if (!conduit.useNIOStream()) {
+ this.threadConnectionMap = new ConcurrentHashMap<>();
+ this.connectionPool = null;
+ return;
+ }
+ this.threadConnectionMap = null; // no thread-local maps with NIOStream
+ // The factory for creating new Connections for connectionPool.
+ KeyedPooledObjectFactory<ConnKey, Connection> connectionFactory;
+ connectionFactory = new KeyedPooledObjectFactory<ConnKey, Connection>() {
+ @Override
+ public PooledObject<Connection> makeObject(ConnKey key) throws Exception {
+ final TCPConduit owner = ConnectionTable.this.owner;
+ Connection newConn = Connection.createSender(owner.getMembershipManager(),
+ ConnectionTable.this, true /* preserveOrder */, key.member,
+ false /* shared */,
+ true /* pooled */, key.startTime, key.ackTimeout, key.ackSATimeout);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ConnectionTable: created a pooled ordered connection: " +
+ newConn);
+ }
+ owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);
+ return new DefaultPooledObject<>(newConn);
+ }
+
+ @Override
+ public void destroyObject(ConnKey key,
+ PooledObject<Connection> p) throws Exception {
+ closeCon("Communications are shutting down", p.getObject());
+ }
+
+ @Override
+ public boolean validateObject(ConnKey key, PooledObject<Connection> p) {
+ return !p.getObject().isClosing();
+ }
+
+ @Override
+ public void activateObject(ConnKey key,
+ PooledObject<Connection> p) throws Exception {}
+
+ @Override
+ public void passivateObject(ConnKey key,
+ PooledObject<Connection> p) throws Exception {}
+ };
+ this.connectionPool = new QueryKeyedObjectPool<>(connectionFactory,
+ conduit.getCancelCriterion());
+ final int numConnections = Math.max(ClusterDistributionManager.MAX_PR_THREADS_SET,
+ // SNAP-1682
+ Math.max(32, Math.min(64, ClusterDistributionManager.MAX_PR_THREADS)));
+ this.connectionPool.setMaxTotalPerKey(numConnections);
+ this.connectionPool.setMaxIdlePerKey(numConnections);
+ this.connectionPool.setTestOnBorrow(true);
+ this.connectionPool.setTestOnReturn(true);
+ final int connectionTimeout = owner.idleConnectionTimeout;
+ this.connectionPool.setTimeBetweenEvictionRunsMillis(connectionTimeout);
+ // default idle-timeout for a connection is 5 minutes
+ this.connectionPool.setMinEvictableIdleTimeMillis(connectionTimeout * 5);
}
+
+
private Executor createThreadPoolForIO(boolean conserveSockets) {
if (conserveSockets) {
return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
@@ -287,7 +389,7 @@ public class ConnectionTable {
Connection con = null;
try {
con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id,
- sharedResource, startTime, ackThreshold, ackSAThreshold);
+ sharedResource, (connectionPool != null), startTime, ackThreshold, ackSAThreshold);
this.owner.getStats().incSenders(sharedResource, preserveOrder);
} finally {
// our connection failed to notify anyone waiting for our pending con
@@ -373,7 +475,8 @@ public class ConnectionTable {
throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
- final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
+ final ConcurrentMap<InternalDistributedMember, Object> m =
+ preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
PendingConnection pc = null; // new connection, if needed
Object mEntry = null; // existing connection (if we don't create a new one)
@@ -389,7 +492,7 @@ public class ConnectionTable {
}
if (mEntry == null) {
pc = new PendingConnection(preserveOrder, id);
- m.put(id, pc);
+ m.put((InternalDistributedMember) id, pc);
}
} // synchronized
@@ -425,10 +528,17 @@ public class ConnectionTable {
return result;
}
+ private void checkClosing() {
+ owner.getCancelCriterion().checkCancelInProgress(null);
+ if (this.closed) {
+ throw new DistributedSystemDisconnectedException("Connection table is closed");
+ }
+ }
+
/**
* Must be looking for an ordered connection that this thread owns
*
- * @param id stub on which to create the connection
+ * @param id member on which to create the connection
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
@@ -437,6 +547,19 @@ public class ConnectionTable {
*/
Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+ if (this.connectionPool != null) {
+ try {
+ return this.connectionPool.borrowObject(
+ new ConnKey(id, startTime, ackTimeout, ackSATimeout));
+ } catch (IOException | RuntimeException e) {
+ checkClosing();
+ throw e;
+ } catch (Exception e) {
+ checkClosing();
+ throw new IllegalStateException(e);
+ }
+ }
+
Connection result = null;
// Look for result in the thread local
@@ -474,7 +597,7 @@ public class ConnectionTable {
// OK, we have to create a new connection.
result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */,
- id, false /* shared */, startTime, ackTimeout, ackSATimeout);
+ id, false, false, startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
}
@@ -498,7 +621,7 @@ public class ConnectionTable {
// Since it's a concurrent map, we just try to put it and then
// return whichever we got.
- Object o = this.threadConnectionMap.putIfAbsent(id, al);
+ Object o = this.threadConnectionMap.putIfAbsent((InternalDistributedMember) id, al);
if (o != null) {
al = (ArrayList) o;
}
@@ -518,6 +641,18 @@ public class ConnectionTable {
return result;
}
+ public final void releasePooledConnection(Connection conn) {
+ if (conn.isPooled() && this.connectionPool != null) {
+ try {
+ this.connectionPool.returnObject(new ConnKey(conn.getRemoteAddress()), conn);
+ } catch (RuntimeException re) {
+ // log any exception in returning to pool and move on
+ logger.warn("Unexpected exception in returning connection to pool " + re);
+ }
+ }
+ }
+
+
/** schedule an idle-connection timeout task */
private void scheduleIdleTimeout(Connection conn) {
if (conn == null) {
@@ -701,6 +836,9 @@ public class ConnectionTable {
m.clear();
}
}
+ if (connectionPool != null) {
+ connectionPool.close();
+ }
this.socketCloser.close();
}
@@ -784,12 +922,17 @@ public class ConnectionTable {
}
}
if (!needsRemoval) {
- ConcurrentMap cm = this.threadConnectionMap;
+ ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList) cm.get(memberID);
+ List al = cm.get(memberID);
needsRemoval = al != null && al.size() > 0;
}
}
+ ConnKey connKey = null;
+ if (this.connectionPool != null) {
+ connKey = new ConnKey(memberID);
+ needsRemoval = this.connectionPool.getNumTotal(connKey) > 0;
+ }
if (needsRemoval) {
InternalDistributedMember remoteAddress = null;
@@ -809,9 +952,9 @@ public class ConnectionTable {
}
{
- ConcurrentMap cm = this.threadConnectionMap;
+ ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList) cm.remove(memberID);
+ List al = cm.remove(memberID);
if (al != null) {
synchronized (al) {
for (Iterator it = al.iterator(); it.hasNext();) {
@@ -827,6 +970,15 @@ public class ConnectionTable {
}
}
+ // close all connections for the given address
+ if (this.connectionPool != null) {
+ this.connectionPool.clear(connKey);
+ this.connectionPool.foreachObject(connKey, c -> {
+ closeCon(reason, c);
+ return true;
+ });
+ }
+
// close any sockets that are in the process of being connected
Set toRemove = new HashSet();
synchronized (connectingSockets) {
@@ -889,7 +1041,7 @@ public class ConnectionTable {
}
/** check to see if there are still any receiver threads for the given end-point */
- protected boolean hasReceiversFor(DistributedMember endPoint) {
+ boolean hasReceiversFor(DistributedMember endPoint) {
synchronized (this.receivers) {
for (Iterator it = receivers.iterator(); it.hasNext();) {
Connection con = (Connection) it.next();
@@ -901,10 +1053,11 @@ public class ConnectionTable {
return false;
}
- private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
+ private static void removeFromThreadConMap(ConcurrentMap<InternalDistributedMember, List> cm,
+ DistributedMember memberId,
Connection c) {
if (cm != null) {
- ArrayList al = (ArrayList) cm.get(stub);
+ List al = cm.get(memberId);
if (al != null) {
synchronized (al) {
al.remove(c);
@@ -914,9 +1067,10 @@ public class ConnectionTable {
}
protected void removeThreadConnection(DistributedMember stub, Connection c) {
- /*
- * if (this.closed) { return; }
- */
+ // no thread-locals when using NIOStream connection pooling
+ if (connectionPool != null) {
+ return;
+ }
removeFromThreadConMap(this.threadConnectionMap, stub, c);
Map m = (Map) this.threadOrderedConnMap.get();
if (m != null) {
@@ -954,9 +1108,7 @@ public class ConnectionTable {
*
* @see SystemFailure#loadEmergencyClasses()
*/
- public static void loadEmergencyClasses() {
- // don't go any further, Frodo!
- }
+ public static void loadEmergencyClasses() {}
/**
* Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
@@ -965,7 +1117,7 @@ public class ConnectionTable {
* @see SystemFailure#emergencyClose()
*/
public static void emergencyClose() {
- ConnectionTable ct = (ConnectionTable) lastInstance.get();
+ ConnectionTable ct = lastInstance.get();
if (ct == null) {
return;
}
@@ -973,6 +1125,9 @@ public class ConnectionTable {
}
public void removeAndCloseThreadOwnedSockets() {
+ if (connectionPool != null) {
+ return;
+ }
Map m = (Map) this.threadOrderedConnMap.get();
if (m != null) {
// Static cleanup may intervene; we MUST synchronize.
@@ -991,7 +1146,7 @@ public class ConnectionTable {
}
public static void releaseThreadsSockets() {
- ConnectionTable ct = (ConnectionTable) lastInstance.get();
+ ConnectionTable ct = lastInstance.get();
if (ct == null) {
return;
}
@@ -1007,9 +1162,20 @@ public class ConnectionTable {
*/
protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
- ConcurrentMap cm = this.threadConnectionMap;
+ if (this.connectionPool != null) {
+ this.connectionPool.foreachObject(new ConnKey(member), conn -> {
+ if (!conn.isSharedResource() && conn.getOriginatedHere()
+ && conn.getPreserveOrder()) {
+ result.put(conn.getUniqueId(), conn.getMessagesSent());
+ }
+ return true;
+ });
+ return;
+ }
+
+ ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList) cm.get(member);
+ List al = cm.get(member);
if (al != null) {
synchronized (al) {
al = new ArrayList(al);
@@ -1075,6 +1241,7 @@ public class ConnectionTable {
return this.owner.getDM();
}
+
// public boolean isShuttingDown() {
// return this.owner.isShuttingDown();
// }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 06b60a9..15f4900 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -18,6 +18,7 @@ import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -60,10 +61,15 @@ class DirectReplySender implements ReplySender {
logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}", msg,
conn.getRemoteAddress());
}
- ArrayList<Connection> conns = new ArrayList<Connection>(1);
+ List<Connection> conns = new ArrayList<>(1);
conns.add(conn);
- MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+ BaseMsgStreamer ms = null;
try {
+ if (conn.useNIOStream()) {
+ ms = MsgChannelStreamer.create(conns, msg, false, DUMMY_STATS);
+ } else {
+ ms = MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+ }
ms.writeMessage();
ConnectExceptions ce = ms.getConnectExceptions();
if (ce != null && !ce.getMembers().isEmpty()) {
@@ -81,7 +87,9 @@ class DirectReplySender implements ReplySender {
"Unknown error serializing message", ex);
} finally {
try {
- ms.close();
+ if (ms != null) {
+ ms.close();
+ }
} catch (IOException e) {
throw new InternalGemFireException("Unknown error serializing message", e);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java
new file mode 100644
index 0000000..ba57e42
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.shared.unsafe.ChannelBufferUnsafeDataInputStream;
+
+/**
+ * Direct streaming buffered reads from TCP channels.
+ */
+public final class MsgChannelDestreamer
+ extends ChannelBufferUnsafeDataInputStream
+ implements VersionedDataStream {
+
+ private final Connection conn;
+ private Version remoteVersion;
+ private long newMessageStart;
+ private boolean newMessage;
+
+ MsgChannelDestreamer(Connection conn, SocketChannel channel, int bufferSize) {
+ super(channel, bufferSize);
+ this.conn = conn;
+ }
+
+ void setRemoteVersion(Version version) {
+ this.remoteVersion = version;
+ }
+
+ void startNewMessage() {
+ this.newMessageStart = this.bytesRead;
+ this.newMessage = true;
+ }
+
+ int getLastMessageLength() {
+ return (int) (this.bytesRead - this.newMessageStart);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Version getVersion() {
+ return this.remoteVersion;
+ }
+
+ @Override
+ protected int refillBuffer(final ByteBuffer channelBuffer,
+ final int tryReadBytes, final String eofMessage) throws IOException {
+ final byte originalState;
+ synchronized (conn.stateLock) {
+ originalState = conn.connectionState;
+ if (originalState == Connection.STATE_IDLE) {
+ conn.connectionState = Connection.STATE_READING;
+ }
+ }
+ final int numBytes = super.refillBuffer(channelBuffer,
+ tryReadBytes, eofMessage);
+ if (originalState == Connection.STATE_IDLE) {
+ synchronized (conn.stateLock) {
+ conn.connectionState = Connection.STATE_IDLE;
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ protected int readIntoBuffer(ByteBuffer buffer) throws IOException {
+ if (!conn.connected) {
+ throw new ClosedChannelException();
+ }
+ int numBytes = super.readIntoBuffer(buffer);
+ if (numBytes > 0) {
+ conn.getConduit().getStats().incMessagesBeingReceived(
+ this.newMessage, numBytes);
+ this.newMessage = false;
+ }
+ return numBytes;
+ }
+
+ @Override
+ public long getParkNanosMax() {
+ // increased timeout to enable detection of failed sockets
+ return 90000000000L;
+ }
+
+ @Override
+ protected int readIntoBufferNoWait(ByteBuffer buffer)
+ throws IOException {
+ return readIntoBuffer(buffer);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java
new file mode 100644
index 0000000..ca91bb3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.shared.unsafe.ChannelBufferUnsafeDataOutputStream;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
+
+/**
+ * A {@link BaseMsgStreamer} implementation that directly writes to the channels
+ * in a buffered manner actually streaming data without any chunking etc.
+ */
+public final class MsgChannelStreamer extends ChannelBufferUnsafeDataOutputStream
+ implements BaseMsgStreamer, VersionedDataStream {
+
+ private final DistributionMessage msg;
+ private final boolean directReply;
+ private final List<Connection> connections;
+ private final StoppableReentrantLock[] locks;
+ private final DMStats stats;
+ private final Version remoteVersion;
+
+ /**
+ * Any exceptions that happen during sends.
+ */
+ private ConnectExceptions ce;
+
+ private static final Comparator<Connection> idCompare = (c1, c2) -> {
+ final long id1 = c1.getUniqueId();
+ final long id2 = c2.getUniqueId();
+ return (id1 < id2) ? -1 : ((id1 == id2) ? 0 : 1);
+ };
+
+ private MsgChannelStreamer(List<Connection> connections,
+ Connection firstConn, DistributionMessage msg, boolean directReply,
+ DMStats stats, Version remoteVersion) throws SocketException {
+ // apply the VM.maxDirectMemory() limit for these on-the-fly streamers
+ super(firstConn.getSocket().getChannel(),
+ firstConn.getConduit().tcpBufferSize);
+ this.msg = msg;
+ this.directReply = directReply;
+ final int numConnections = connections.size();
+ // keep the connections in order sorted by localPort for consistent locking
+ // of shared connections
+ if (numConnections > 1) {
+ connections.sort(idCompare);
+ }
+ this.connections = connections;
+ this.locks = new StoppableReentrantLock[numConnections];
+ this.stats = stats;
+ this.remoteVersion = remoteVersion;
+ }
+
+ private static ArrayList<Connection> singleConnection(Connection conn) {
+ ArrayList<Connection> list = new ArrayList<>(1);
+ list.add(conn);
+ return list;
+ }
+
+ public static BaseMsgStreamer create(List<Connection> connections,
+ final DistributionMessage msg, final boolean directReply,
+ DMStats stats) throws SocketException {
+ // if all connections have the same version then use one MsgChannelStreamer
+ Connection firstConn = connections.get(0);
+ Version remoteVersion = firstConn.remoteVersion;
+ boolean hasSameVersions = true;
+ final int numConnections = connections.size();
+ // choose firstConn for locking using some criteria (e.g. min localPort)
+ for (int i = 1; i < numConnections; i++) {
+ Connection conn = connections.get(i);
+ if (conn.getSocket().getLocalPort() < firstConn.getSocket()
+ .getLocalPort()) {
+ firstConn = conn;
+ }
+ Version connVersion = conn.remoteVersion;
+ if (remoteVersion == null) {
+ if (connVersion == null || connVersion.equals(Version.CURRENT)) {
+ continue;
+ }
+ } else if (remoteVersion.equals(connVersion)) {
+ continue;
+ }
+ hasSameVersions = false;
+ break;
+ }
+ if (hasSameVersions) {
+ return new MsgChannelStreamer(connections, firstConn, msg,
+ directReply, stats, remoteVersion);
+ } else {
+ // create list of streamers (separate even if there are common versions
+ // to keep things simple as this case itself is a rare one)
+ ArrayList<BaseMsgStreamer> streamers = new ArrayList<>(numConnections);
+ for (int i = 0; i < numConnections; i++) {
+ Connection conn = connections.get(i);
+ streamers.add(new MsgChannelStreamer(singleConnection(conn), conn,
+ msg, directReply, stats, conn.remoteVersion));
+ }
+ return new MsgStreamerList(streamers);
+ }
+ }
+
+ @Override
+ public void reserveConnections(long startTime, long ackTimeout,
+ long ackSDTimeout) {
+ final List<Connection> connections = this.connections;
+ final int numConnections = connections.size();
+ for (int i = 0; i < numConnections; i++) {
+ Connection conn = connections.get(i);
+ conn.setInUse(true, startTime, ackTimeout, ackSDTimeout, connections);
+ if (ackTimeout > 0) {
+ conn.scheduleAckTimeouts();
+ }
+ }
+ }
+
+ @Override
+ public List<?> getSentConnections() {
+ return this.connections;
+ }
+
+ @Override
+ public ConnectExceptions getConnectExceptions() {
+ return this.ce;
+ }
+
+ private void acquireLocks() {
+ final List<Connection> connections = this.connections;
+ final int numConnections = connections.size();
+ for (int i = 0; i < numConnections; i++) {
+ final StoppableReentrantLock lock = connections.get(i).getOutLock();
+ lock.lock();
+ // assign only after successful acquisition of the lock
+ this.locks[i] = lock;
+ }
+ }
+
+ private void releaseLocks() {
+ final int numLocks = this.locks.length;
+ for (int i = 0; i < numLocks; i++) {
+ this.locks[i].unlock();
+ this.locks[i] = null;
+ }
+ }
+
+ @Override
+ public int writeMessage() throws IOException {
+ final long startNumBytes = this.bytesWritten;
+ final DMStats stats = this.stats;
+ acquireLocks();
+ try {
+ long start = stats.startMsgSerialization();
+ byte msgType = Connection.NORMAL_MSG_TYPE;
+ if (directReply) {
+ msgType |= Connection.DIRECT_ACK_BIT;
+ }
+ putByte(msgType);
+ InternalDataSerializer.writeDSFID(msg, this);
+ stats.endMsgSerialization(start);
+ flush();
+ } finally {
+ releaseLocks();
+ }
+ if (msg.containsRegionContentChange()) {
+ final List<Connection> connections = this.connections;
+ final int numConnections = connections.size();
+ for (int i = 0; i < numConnections; i++) {
+ connections.get(i).incMessagesSent();
+ }
+ }
+ return (int) (this.bytesWritten - startNumBytes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // flush should already done in normal course but for exception
+ // cases discard whatever is left in the buffer
+ this.addrPosition = this.addrLimit = 0;
+ releaseBuffer();
+ }
+
+ @Override
+ public void release() {
+ // just clear the buffer
+ this.buffer.clear();
+ resetBufferPositions();
+ }
+
+ @Override
+ public Version getVersion() {
+ return this.remoteVersion;
+ }
+
+ private int writeBufferToChannel(final ByteBuffer buffer,
+ final Connection conn, final SocketChannel channel,
+ final boolean flush) {
+ final DMStats stats = this.stats;
+ final boolean origSocketInUse = conn.socketInUse;
+ byte originalState = -1;
+ try {
+ if (!conn.connected) {
+ throw new ConnectionException("not connected to " + conn.getRemoteAddress());
+ }
+ synchronized (conn.stateLock) {
+ originalState = conn.connectionState;
+ conn.connectionState = Connection.STATE_SENDING;
+ }
+ conn.socketInUse = true;
+ if (!conn.isSharedResource()) {
+ stats.incTOSentMsg();
+ }
+
+ long startLock = stats.startSocketLock();
+ stats.endSocketLock(startLock);
+ long start = stats.startSocketWrite(true);
+ int numWritten = 0;
+ try {
+ if (flush) {
+ do {
+ numWritten += super.writeBuffer(buffer, channel);
+ } while (buffer.hasRemaining());
+ } else {
+ numWritten += super.writeBufferNoWait(buffer, channel);
+ }
+ return numWritten;
+ } finally {
+ stats.endSocketWrite(true, start, numWritten, 0);
+ }
+
+ } catch (IOException ex) {
+ if (this.ce == null)
+ this.ce = new ConnectExceptions();
+ this.ce.addFailure(conn.getRemoteAddress(), ex);
+ conn.closeForReconnect("closing due to unexpected IOException");
+ return -1;
+ } catch (ConnectionException ex) {
+ if (this.ce == null)
+ this.ce = new ConnectExceptions();
+ this.ce.addFailure(conn.getRemoteAddress(), ex);
+ conn.closeForReconnect("closing due to unexpected ConnetionException");
+ return -1;
+ } finally {
+ conn.accessed();
+ conn.socketInUse = origSocketInUse;
+ synchronized (conn.stateLock) {
+ conn.connectionState = originalState;
+ }
+ }
+ }
+
+ @Override
+ protected int writeBuffer(final ByteBuffer buffer,
+ WritableByteChannel channel) throws IOException {
+ final List<Connection> connections = this.connections;
+ final int numConnections = connections.size();
+ final int bufferPosition = buffer.position();
+ final int bufferSize = buffer.limit() - bufferPosition;
+ if (numConnections == 1) {
+ Connection conn = connections.get(0);
+ if (writeBufferToChannel(buffer, conn, conn.getSocket().getChannel(),
+ true) < 0) {
+ connections.remove(0);
+ }
+ return bufferSize;
+ }
+
+ ArrayList<Connection> pendingConnections = null;
+ IntArrayList pendingWritten = null;
+ int numWritten;
+ // write to all the channels
+ for (int i = numConnections - 1; i >= 0; i--) {
+ Connection conn = connections.get(i);
+ SocketChannel socketChannel = conn.getSocket().getChannel();
+ // rewind buffer to the start
+ buffer.position(bufferPosition);
+ if ((numWritten = writeBufferToChannel(buffer, conn,
+ socketChannel, false)) >= 0) {
+ if (numWritten < bufferSize) {
+ // put into pending list and go on to next channel
+ if (pendingConnections == null) {
+ int initSize = numConnections >>> 1;
+ pendingConnections = new ArrayList<>(initSize);
+ pendingWritten = new IntArrayList(initSize);
+ }
+ pendingConnections.add(conn);
+ pendingWritten.add(bufferPosition + numWritten);
+ }
+ } else {
+ connections.remove(i);
+ }
+ }
+ // now process the pending channels in a blocking manner
+ if (pendingConnections != null) {
+ processPendingChannels(buffer, pendingConnections, pendingWritten);
+ }
+
+ // position the buffer at the end indicating everything was written
+ buffer.position(buffer.limit());
+
+ this.bytesWritten += bufferSize;
+ return bufferSize;
+ }
+
+ private void processPendingChannels(ByteBuffer buffer,
+ ArrayList<Connection> pendingConnections, IntArrayList written)
+ throws IOException {
+ final int numChannels = pendingConnections.size();
+ for (int i = 0; i < numChannels; i++) {
+ final Connection conn = pendingConnections.get(i);
+ // move buffer to appropriate position
+ buffer.position(written.getInt(i));
+ if (writeBufferToChannel(buffer, conn, conn.getSocket().getChannel(),
+ true) < 0) {
+ this.connections.remove(conn);
+ }
+ }
+ }
+
+ @Override
+ public long getParkNanosMax() {
+ // never throw timeout here
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ protected int writeBufferNoWait(ByteBuffer buffer,
+ WritableByteChannel channel) throws IOException {
+ // all writes are blocking for messages
+ return writeBuffer(buffer, channel);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 600d967..b374066 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -31,7 +31,7 @@ import org.apache.geode.internal.ObjToByteArraySerializer;
*
*/
public class MsgOutputStream extends OutputStream implements ObjToByteArraySerializer {
- private final ByteBuffer buffer;
+ final ByteBuffer buffer;
/**
* The caller of this constructor is responsible for managing the allocated instance.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index a09d2f2..9598024 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -90,10 +90,7 @@ public class MsgStreamer extends OutputStream
private long serStartTime;
private final boolean directReply;
- /**
- * Called to free up resources used by this streamer after the streamer has produced its message.
- */
- protected void release() {
+ public void release() {
MsgIdGenerator.release(this.msgId);
this.buffer.clear();
this.overflowBuf = null;
@@ -171,8 +168,8 @@ public class MsgStreamer extends OutputStream
} else {
// if there is a versioned stream created, then split remaining
// connections to unversioned stream
- final ArrayList<MsgStreamer> streamers =
- new ArrayList<MsgStreamer>(versionToConnMap.size() + 1);
+ final List<BaseMsgStreamer> streamers =
+ new ArrayList<>(versionToConnMap.size() + 1);
final int sendBufferSize = firstCon.getSendBufferSize();
if (numCons > numVersioned) {
// allocating list of numCons size so that as the result of
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 5a51af0..b17829c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -34,11 +34,11 @@ public class MsgStreamerList implements BaseMsgStreamer {
private static final Logger logger = LogService.getLogger();
/**
- * List of {@link MsgStreamer}s encapsulated by this MsgStreamerList.
+ * List of {@link BaseMsgStreamer}s encapsulated by this MsgStreamerList.
*/
- private final List<MsgStreamer> streamers;
+ private final List<BaseMsgStreamer> streamers;
- MsgStreamerList(List<MsgStreamer> streamers) {
+ MsgStreamerList(List<BaseMsgStreamer> streamers) {
this.streamers = streamers;
}
@@ -47,7 +47,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
*/
@Override
public void reserveConnections(long startTime, long ackTimeout, long ackSDTimeout) {
- for (MsgStreamer streamer : this.streamers) {
+ for (BaseMsgStreamer streamer : this.streamers) {
streamer.reserveConnections(startTime, ackTimeout, ackSDTimeout);
}
}
@@ -60,7 +60,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
int result = 0;
RuntimeException ex = null;
IOException ioex = null;
- for (MsgStreamer streamer : this.streamers) {
+ for (BaseMsgStreamer streamer : this.streamers) {
if (ex != null) {
streamer.release();
// TODO: shouldn't we call continue here?
@@ -100,7 +100,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
@Override
public List<?> getSentConnections() {
List<Object> sentCons = Collections.emptyList();
- for (MsgStreamer streamer : this.streamers) {
+ for (BaseMsgStreamer streamer : this.streamers) {
if (sentCons.size() == 0) {
sentCons = (List<Object>) streamer.getSentConnections();
} else {
@@ -116,7 +116,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
@Override
public ConnectExceptions getConnectExceptions() {
ConnectExceptions ce = null;
- for (MsgStreamer streamer : this.streamers) {
+ for (BaseMsgStreamer streamer : this.streamers) {
if (ce == null) {
ce = streamer.getConnectExceptions();
} else {
@@ -141,7 +141,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
public void close() throws IOException {
// only throw the first exception and try to close all
IOException ex = null;
- for (MsgStreamer m : this.streamers) {
+ for (BaseMsgStreamer m : this.streamers) {
try {
m.close();
} catch (IOException e) {
@@ -157,4 +157,11 @@ public class MsgStreamerList implements BaseMsgStreamer {
throw ex;
}
}
+
+ @Override
+ public void release() {
+ for (BaseMsgStreamer m : this.streamers) {
+ m.release();
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java
new file mode 100644
index 0000000..192a33e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.tcp;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Predicate;
+
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+
+import org.apache.geode.CancelCriterion;
+
+/**
+ * A configurable <code>KeyedObjectPool</code> implementation.
+ * <p>
+ * This is extends common-pool2 GenericKeyedObjectPool and differs only
+ * in additional methods to query objects by key.
+ *
+ * @param <K> The type of keys maintained by this pool.
+ * @param <T> Type of element pooled in this pool.
+ *
+ * @see GenericObjectPool
+ * @see GenericKeyedObjectPool
+ */
+public final class QueryKeyedObjectPool<K, T>
+ extends GenericKeyedObjectPool<K, T> {
+
+ /*
+ * The "poolMap" field from the parent class obtained using reflection.
+ */
+ private final ConcurrentHashMap<?, ?> subPoolMap;
+
+ /**
+ * The "keyLock" field from the parent class obtained using reflection.
+ */
+ private final ReadWriteLock globalKeyLock;
+
+ /**
+ * Check for cancellation with this in borrowObject.
+ */
+ private final CancelCriterion cancelCriterion;
+
+ /**
+ * The "allObjects" field from ObjectDequeue class obtained using reflection.
+ */
+ private volatile Field allObjectsField;
+
+ /**
+ * Create a new <code>QueryKeyedObjectPool</code> using defaults from
+ * {@link GenericKeyedObjectPoolConfig}.
+ *
+ * @param factory the factory to be used to create entries
+ * @param cancelCriterion to check for cancellation of borrowObject
+ */
+ public QueryKeyedObjectPool(KeyedPooledObjectFactory<K, T> factory,
+ CancelCriterion cancelCriterion) {
+ this(factory, new GenericKeyedObjectPoolConfig(), cancelCriterion);
+ }
+
+ /**
+ * Create a new <code>QueryKeyedObjectPool</code> using a specific
+ * configuration.
+ *
+ * @param factory the factory to be used to create entries
+ * @param config The configuration to use for this pool instance.
+ * The configuration is used by value. Subsequent
+ * changes to the configuration object will not be
+ * reflected in the pool.
+ * @param cancelCriterion to check for cancellation of borrowObject
+ */
+ @SuppressWarnings("WeakerAccess")
+ public QueryKeyedObjectPool(KeyedPooledObjectFactory<K, T> factory,
+ GenericKeyedObjectPoolConfig config, CancelCriterion cancelCriterion) {
+ super(factory, config);
+
+ try {
+ final Class<?> superClass = getClass().getSuperclass();
+ Field f = superClass.getDeclaredField("poolMap");
+ f.setAccessible(true);
+ this.subPoolMap = (ConcurrentHashMap<?, ?>) f.get(this);
+
+ f = superClass.getDeclaredField("keyLock");
+ f.setAccessible(true);
+ this.globalKeyLock = (ReadWriteLock) f.get(this);
+
+ this.cancelCriterion = cancelCriterion;
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to initialize QueryKeyedObjectPool", e);
+ }
+ }
+
+ private Map<?, ?> getAllObjectsFromDeque(Object deque) {
+ try {
+ Field allObjects = this.allObjectsField;
+ if (allObjects == null) {
+ synchronized (this) {
+ allObjects = this.allObjectsField;
+ if (allObjects == null) {
+ Field f = deque.getClass().getDeclaredField("allObjects");
+ f.setAccessible(true);
+ this.allObjectsField = allObjects = f;
+ }
+ }
+ }
+ return (Map<?, ?>) allObjects.get(deque);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to read pool queue", e);
+ }
+ }
+
+ @Override
+ public T borrowObject(K key, long borrowMaxWaitMillis) throws Exception {
+ // An issue with GenericKeyedObjectPool: it increments the createdCount
+ // first, then checks if it exceeds maxTotalPerKey. It can happen due to
+ // concurrency then that one of the threads goes into a wait due to that
+ // even though connections have not actually been created yet. The creates
+ // could all fail due to remote node going down, but then those threads
+ // that started to wait will still keep waiting (SNAP-1508, SNAP-1509).
+ // Hence this override does it with smaller timeout in a loop.
+ if (borrowMaxWaitMillis < 0) {
+ borrowMaxWaitMillis = Integer.MAX_VALUE;
+ }
+ // retries at max 1 second intervals
+ final long loopWaitMillis = Math.min(1000L, borrowMaxWaitMillis);
+ long startTime = 0L;
+ while (true) {
+ try {
+ return super.borrowObject(key, loopWaitMillis);
+ } catch (NoSuchElementException nse) {
+ // check for cancellation
+ this.cancelCriterion.checkCancelInProgress(nse);
+ // retry till borrowMaxWaitMillis
+ long currentTime = System.currentTimeMillis();
+ if (startTime == 0L) {
+ // first time assume it failed after loopWaitMillis to
+ // optimistically avoid a currentTimeMillis call in first loop
+ startTime = currentTime - loopWaitMillis;
+ }
+ if ((currentTime - startTime) >= borrowMaxWaitMillis) {
+ throw nse;
+ }
+ }
+ }
+ }
+
+ public int getNumTotal(K key) {
+ final Object objectDeque = subPoolMap.get(key);
+ if (objectDeque != null) {
+ return getAllObjectsFromDeque(objectDeque).size();
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Apply the given function on each object for a key both idle (waiting
+ * to be borrowed) and active (currently borrowed).
+ */
+ public void foreachObject(K key, Predicate<T> predicate) {
+ Lock readLock = globalKeyLock.readLock();
+ readLock.lock();
+ try {
+ Object queue = subPoolMap.get(key);
+ if (queue != null) {
+ for (Object p : getAllObjectsFromDeque(queue).values()) {
+ @SuppressWarnings("unchecked")
+ final PooledObject<T> pooledObject = (PooledObject<T>) p;
+ if (!predicate.test(pooledObject.getObject())) {
+ break;
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 0057847..f69b742 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.tcp;
import java.io.IOException;
-import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -117,6 +116,11 @@ public class TCPConduit implements Runnable {
private static boolean USE_NIO;
/**
+ * Use the new streaming using NIO disabling chunking.
+ */
+ private static boolean USE_NIO_STREAM;
+
+ /**
* use direct ByteBuffers instead of heap ByteBuffers for NIO operations
*/
static boolean useDirectBuffers;
@@ -150,6 +154,7 @@ public class TCPConduit implements Runnable {
useSSL = Boolean.getBoolean("p2p.useSSL");
// only use nio if not SSL
USE_NIO = !useSSL && !Boolean.getBoolean("p2p.oldIO");
+ USE_NIO_STREAM = USE_NIO && !Boolean.getBoolean("p2p.disableNIOStream");
// only use direct buffers if we are using nio
useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
@@ -279,26 +284,6 @@ public class TCPConduit implements Runnable {
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
this.useNIO = USE_NIO;
- if (this.useNIO) {
- InetAddress addr = address;
- if (addr == null) {
- try {
- addr = SocketCreator.getLocalHost();
- } catch (java.net.UnknownHostException e) {
- throw new ConnectionException("Unable to resolve localHost address", e);
- }
- }
- // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
- if (addr instanceof Inet6Address) {
- String os = System.getProperty("os.name");
- if (os != null) {
- if (os.indexOf("Windows") != -1) {
- this.useNIO = false;
- }
- }
- }
- }
-
startAcceptor();
}
@@ -841,6 +826,16 @@ public class TCPConduit implements Runnable {
}
/**
+ * Return true if NIO classes with buffered streaming is being used
+ * for the server socket. This is different from "useNIO" in that it
+ * streams messages directly to sockets as well as reads in streaming
+ * manner using buffered DataOutput/Input streams rather than chunking.
+ */
+ public boolean useNIOStream() {
+ return USE_NIO_STREAM;
+ }
+
+ /**
* gets the address of this conduit's ServerSocket endpoint
*/
public InetSocketAddress getSocketId() {
@@ -894,6 +889,7 @@ public class TCPConduit implements Runnable {
for (;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
+ boolean returningCon = false;
try {
// If this is the second time through this loop, we had
// problems. Tear down the connection so that it gets
@@ -944,7 +940,10 @@ public class TCPConduit implements Runnable {
conn.closeForReconnect("closing before retrying");
} catch (CancelException ex) {
throw ex;
- } catch (Exception ex) {
+ } catch (Exception ignored) {
+ } finally {
+ releasePooledConnection(conn);
+ conn = null;
}
}
} // not first time in loop
@@ -970,10 +969,14 @@ public class TCPConduit implements Runnable {
logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn,
conn.hashCode());
}
- conn.closeOldConnection("closing old connection");
- conn = null;
- retryForOldConnection = true;
- debugRetry = true;
+ try {
+ conn.closeOldConnection("closing old connection");
+ } finally {
+ releasePooledConnection(conn);
+ conn = null;
+ retryForOldConnection = true;
+ debugRetry = true;
+ }
}
} while (retryForOldConnection);
if (debugRetry && logger.isDebugEnabled()) {
@@ -1064,8 +1067,13 @@ public class TCPConduit implements Runnable {
logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
}
}
+ returningCon = true;
return conn;
} finally {
+ // need to return unused connections to pool
+ if (!returningCon && conn != null) {
+ releasePooledConnection(conn);
+ }
if (interrupted) {
Thread.currentThread().interrupt();
}
@@ -1073,6 +1081,13 @@ public class TCPConduit implements Runnable {
} // for(;;)
}
+ public final void releasePooledConnection(Connection conn) {
+ final ConnectionTable conTable = this.conTable;
+ if (conTable != null) {
+ conTable.releasePooledConnection(conn);
+ }
+ }
+
@Override
public String toString() {
return "" + id;
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
index b548938..153eb5b 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
@@ -195,4 +195,9 @@ public class UnsafeWrapper {
public void setMemory(long addr, long size, byte v) {
this.unsafe.setMemory(addr, size, v);
}
+
+ public void setMemory(Object obj, long offset, long size, byte value) {
+ this.unsafe.setMemory(obj, offset, size, value);
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 075b252..6163b94 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -52,6 +52,7 @@ public class ConnectionJUnitTest {
DistributionManager distMgr = mock(DistributionManager.class);
MembershipManager membership = mock(MembershipManager.class);
TCPConduit conduit = mock(TCPConduit.class);
+ when(conduit.useNIOStream()).thenReturn(false);
// mock the connection table and conduit
diff --git a/geode-core/src/test/resources/expected-pom.xml b/geode-core/src/test/resources/expected-pom.xml
index 9b62224..5cd5bf3 100644
--- a/geode-core/src/test/resources/expected-pom.xml
+++ b/geode-core/src/test/resources/expected-pom.xml
@@ -66,6 +66,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.4.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
diff --git a/gradle/dependency-versions.properties b/gradle/dependency-versions.properties
index 0ad0625..3578d4f 100644
--- a/gradle/dependency-versions.properties
+++ b/gradle/dependency-versions.properties
@@ -32,6 +32,7 @@ commons-io.version = 2.6
commons-lang3.version = 3.8.1
commons-logging.version = 1.2
commons-modeler.version = 2.0.1
+commons-pool2.version = 2.4.2
commons-validator.version = 1.6
HikariCP.version = 3.2.0
derby.version = 10.14.2.0