You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/17 11:47:31 UTC

[4/6] ignite git commit: IGNITE-2627: Implemented OdbcNioServerBuffer and OdbcBufferedParser.

IGNITE-2627: Implemented OdbcNioServerBuffer and OdbcBufferedParser.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb0788c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb0788c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb0788c1

Branch: refs/heads/ignite-1786
Commit: bb0788c1128e7239c294fbc9a0ad17addd2026e7
Parents: a7ffce0
Author: isapego <is...@gridgain.com>
Authored: Tue Feb 16 17:37:26 2016 +0300
Committer: isapego <is...@gridgain.com>
Committed: Tue Feb 16 17:37:26 2016 +0300

----------------------------------------------------------------------
 .../processors/odbc/OdbcBufferedParser.java     |  96 ++++++++++++++
 .../processors/odbc/OdbcNioServerBuffer.java    | 129 +++++++++++++++++++
 .../internal/processors/odbc/OdbcProcessor.java |   2 +-
 modules/platforms/cpp/odbc/src/connection.cpp   |   6 +-
 4 files changed, 227 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
new file mode 100644
index 0000000..4ce8a8a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This class implements stream parser based on {@link OdbcNioServerBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is prepended with
+ * 4-byte integer header containing message size. So, the stream structure is as follows:
+ * <pre>
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * </pre>
+ */
+public class OdbcBufferedParser implements GridNioParser {
+    /** Buffer metadata key. */
+    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Direct buffer alocation flag. */
+    private final boolean directBuf;
+
+    /** Message size byte order. */
+    private final ByteOrder order;
+
+    /**
+     * @param directBuf Direct buffer.
+     * @param order Message size byte order.
+     */
+    public OdbcBufferedParser(boolean directBuf, ByteOrder order) {
+        this.directBuf = directBuf;
+        this.order = order;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+        OdbcNioServerBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+        // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+        // However, we make some additional checks.
+        if (nioBuf == null) {
+            nioBuf = new OdbcNioServerBuffer(order);
+
+            OdbcNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+            assert old == null;
+        }
+
+        return nioBuf.read(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+        byte[] msg0 = (byte[])msg;
+
+        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length + 4) : ByteBuffer.allocate(msg0.length + 4);
+
+        res.order(order);
+
+        res.putInt(msg0.length);
+        res.put(msg0);
+
+        res.flip();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return OdbcBufferedParser.class.getSimpleName();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
new file mode 100644
index 0000000..20cf4c5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * NIO server buffer.
+ */
+public class OdbcNioServerBuffer {
+    /** Current message data. */
+    private byte[] data;
+
+    /** Count of received bytes of the current message. */
+    private int cnt = -4;
+
+    /** Current message size. */
+    private int msgSize;
+
+    /** Message size byte order. */
+    private final ByteOrder order;
+
+    /**
+     * @param order Byte order.
+     */
+    public OdbcNioServerBuffer(ByteOrder order) {
+        this.order = order;
+    }
+
+    /**
+     * Reset buffer state.
+     */
+    public void reset() {
+        msgSize = 0;
+        cnt = -4;
+        data = null;
+    }
+
+    /**
+     * Checks whether the byte array is filled.
+     *
+     * @return Flag indicating whether byte array is filled or not.
+     */
+    public boolean isFilled() {
+        return cnt > 0 && cnt == msgSize;
+    }
+
+    /**
+     * Get data withing the buffer.
+     *
+     * @return Data.
+     */
+    public byte[] data() {
+        return data;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @return Message bytes or {@code null} if message is not fully read yet.
+     * @throws IgniteCheckedException If failed to parse message.
+     */
+    @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException {
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++) {
+                if (order == ByteOrder.BIG_ENDIAN)
+                    msgSize = (msgSize << 8) | buf.get() & 0xFF;
+                else
+                    msgSize |= (buf.get() & 0xFF) << (8*(4 + cnt));
+            }
+
+            if (cnt < 0)
+                return null;
+
+            // If count is 0 then message size should be inited.
+            if (msgSize <= 0)
+                throw new IgniteCheckedException("Invalid message size: " + msgSize);
+
+            data = new byte[msgSize];
+        }
+
+        assert msgSize > 0;
+        assert cnt >= 0;
+
+        int remaining = buf.remaining();
+
+        // If there are more bytes in buffer.
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            // Read only up to message size.
+            if (missing > 0) {
+                int len = missing < remaining ? missing : remaining;
+
+                buf.get(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            byte[] data0 = data;
+
+            reset();
+
+            return data0;
+        }
+        else
+            return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index a3ed422..831ef02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -84,7 +84,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
                     .socketSendBufferSize(odbcCfg.getSendBufferSize())
                     .socketReceiveBufferSize(odbcCfg.getReceiveBufferSize())
                     .sendQueueLimit(odbcCfg.getSendQueueLimit())
-                    .filters(new GridNioCodecFilter(new GridBufferedParser(false, ByteOrder.BIG_ENDIAN), log, false))
+                    .filters(new GridNioCodecFilter(new OdbcBufferedParser(false, ByteOrder.LITTLE_ENDIAN), log, false))
                     .directMode(false)
                     .idleTimeout(odbcCfg.getIdleTimeout())
                     .build();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index 2905ed7..37e81d0 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -172,8 +172,7 @@ namespace ignite
 
             OdbcProtocolHeader hdr;
 
-            // Lenght should has Big Endian byte order.
-            hdr.len = htonl(static_cast<unsigned long>(len));
+            hdr.len = len;
 
             int sent = socket.Send(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
 
@@ -210,9 +209,6 @@ namespace ignite
 
             int received = socket.Receive(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
 
-            // Lenght has Big Endian byte order.
-            hdr.len = ntohl(hdr.len);
-
             LOG_MSG("Received: %d\n", received);
 
             if (received != sizeof(hdr))