You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/17 11:47:31 UTC
[4/6] ignite git commit: IGNITE-2627: Implemented OdbcNioServerBuffer
and OdbcBufferedParser.
IGNITE-2627: Implemented OdbcNioServerBuffer and OdbcBufferedParser.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb0788c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb0788c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb0788c1
Branch: refs/heads/ignite-1786
Commit: bb0788c1128e7239c294fbc9a0ad17addd2026e7
Parents: a7ffce0
Author: isapego <is...@gridgain.com>
Authored: Tue Feb 16 17:37:26 2016 +0300
Committer: isapego <is...@gridgain.com>
Committed: Tue Feb 16 17:37:26 2016 +0300
----------------------------------------------------------------------
.../processors/odbc/OdbcBufferedParser.java | 96 ++++++++++++++
.../processors/odbc/OdbcNioServerBuffer.java | 129 +++++++++++++++++++
.../internal/processors/odbc/OdbcProcessor.java | 2 +-
modules/platforms/cpp/odbc/src/connection.cpp | 6 +-
4 files changed, 227 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
new file mode 100644
index 0000000..4ce8a8a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This class implements stream parser based on {@link OdbcNioServerBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is prepended with
+ * 4-byte integer header containing message size. So, the stream structure is as follows:
+ * <pre>
+ * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
+ * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * </pre>
+ */
+public class OdbcBufferedParser implements GridNioParser {
+ /** Buffer metadata key. */
+ private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Direct buffer alocation flag. */
+ private final boolean directBuf;
+
+ /** Message size byte order. */
+ private final ByteOrder order;
+
+ /**
+ * @param directBuf Direct buffer.
+ * @param order Message size byte order.
+ */
+ public OdbcBufferedParser(boolean directBuf, ByteOrder order) {
+ this.directBuf = directBuf;
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+ OdbcNioServerBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+ // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+ // However, we make some additional checks.
+ if (nioBuf == null) {
+ nioBuf = new OdbcNioServerBuffer(order);
+
+ OdbcNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+ assert old == null;
+ }
+
+ return nioBuf.read(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ byte[] msg0 = (byte[])msg;
+
+ ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length + 4) : ByteBuffer.allocate(msg0.length + 4);
+
+ res.order(order);
+
+ res.putInt(msg0.length);
+ res.put(msg0);
+
+ res.flip();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return OdbcBufferedParser.class.getSimpleName();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
new file mode 100644
index 0000000..20cf4c5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * NIO server buffer.
+ */
+public class OdbcNioServerBuffer {
+ /** Current message data. */
+ private byte[] data;
+
+ /** Count of received bytes of the current message. */
+ private int cnt = -4;
+
+ /** Current message size. */
+ private int msgSize;
+
+ /** Message size byte order. */
+ private final ByteOrder order;
+
+ /**
+ * @param order Byte order.
+ */
+ public OdbcNioServerBuffer(ByteOrder order) {
+ this.order = order;
+ }
+
+ /**
+ * Reset buffer state.
+ */
+ public void reset() {
+ msgSize = 0;
+ cnt = -4;
+ data = null;
+ }
+
+ /**
+ * Checks whether the byte array is filled.
+ *
+ * @return Flag indicating whether byte array is filled or not.
+ */
+ public boolean isFilled() {
+ return cnt > 0 && cnt == msgSize;
+ }
+
+ /**
+ * Get data withing the buffer.
+ *
+ * @return Data.
+ */
+ public byte[] data() {
+ return data;
+ }
+
+ /**
+ * @param buf Buffer.
+ * @return Message bytes or {@code null} if message is not fully read yet.
+ * @throws IgniteCheckedException If failed to parse message.
+ */
+ @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException {
+ if (cnt < 0) {
+ for (; cnt < 0 && buf.hasRemaining(); cnt++) {
+ if (order == ByteOrder.BIG_ENDIAN)
+ msgSize = (msgSize << 8) | buf.get() & 0xFF;
+ else
+ msgSize |= (buf.get() & 0xFF) << (8*(4 + cnt));
+ }
+
+ if (cnt < 0)
+ return null;
+
+ // If count is 0 then message size should be inited.
+ if (msgSize <= 0)
+ throw new IgniteCheckedException("Invalid message size: " + msgSize);
+
+ data = new byte[msgSize];
+ }
+
+ assert msgSize > 0;
+ assert cnt >= 0;
+
+ int remaining = buf.remaining();
+
+ // If there are more bytes in buffer.
+ if (remaining > 0) {
+ int missing = msgSize - cnt;
+
+ // Read only up to message size.
+ if (missing > 0) {
+ int len = missing < remaining ? missing : remaining;
+
+ buf.get(data, cnt, len);
+
+ cnt += len;
+ }
+ }
+
+ if (cnt == msgSize) {
+ byte[] data0 = data;
+
+ reset();
+
+ return data0;
+ }
+ else
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index a3ed422..831ef02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -84,7 +84,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
.socketSendBufferSize(odbcCfg.getSendBufferSize())
.socketReceiveBufferSize(odbcCfg.getReceiveBufferSize())
.sendQueueLimit(odbcCfg.getSendQueueLimit())
- .filters(new GridNioCodecFilter(new GridBufferedParser(false, ByteOrder.BIG_ENDIAN), log, false))
+ .filters(new GridNioCodecFilter(new OdbcBufferedParser(false, ByteOrder.LITTLE_ENDIAN), log, false))
.directMode(false)
.idleTimeout(odbcCfg.getIdleTimeout())
.build();
http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index 2905ed7..37e81d0 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -172,8 +172,7 @@ namespace ignite
OdbcProtocolHeader hdr;
- // Lenght should has Big Endian byte order.
- hdr.len = htonl(static_cast<unsigned long>(len));
+ hdr.len = len;
int sent = socket.Send(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
@@ -210,9 +209,6 @@ namespace ignite
int received = socket.Receive(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
- // Lenght has Big Endian byte order.
- hdr.len = ntohl(hdr.len);
-
LOG_MSG("Received: %d\n", received);
if (received != sizeof(hdr))