You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/28 11:09:48 UTC

ignite git commit: IGNITE-4405: Hadoop: implemented "readLine" method for HadoopDataInStream and HadoopDirectDataInput classes. This closes #1358.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 7d82d6a06 -> a61b0eaff


IGNITE-4405: Hadoop: implemented "readLine" method for HadoopDataInStream and HadoopDirectDataInput classes. This closes #1358.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a61b0eaf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a61b0eaf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a61b0eaf

Branch: refs/heads/ignite-2.0
Commit: a61b0eaff1817d84c0659e8a7e095f29e22800e1
Parents: 7d82d6a
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Wed Dec 28 14:09:38 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 28 14:09:38 2016 +0300

----------------------------------------------------------------------
 .../shuffle/direct/HadoopDirectDataInput.java   |  34 +++-
 .../shuffle/streams/HadoopDataInStream.java     |  34 +++-
 .../shuffle/streams/HadoopOffheapBuffer.java    |  18 ++
 .../streams/HadoopDataStreamSelfTest.java       | 177 +++++++++++++++++--
 4 files changed, 244 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..6f0e2b0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
 
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.jetbrains.annotations.NotNull;
 
 import java.io.DataInput;
@@ -150,8 +151,37 @@ public class HadoopDirectDataInput extends InputStream implements DataInput {
 
     /** {@inheritDoc} */
     @Override public String readLine() throws IOException {
-        // TODO: Create ticket!
-        throw new UnsupportedOperationException();
+        if (pos == buf.length)
+            return null;
+
+        SB sb = new SB();
+
+        while (pos < buf.length) {
+            char c = (char)readByte();
+
+            switch (c) {
+                case '\n':
+                    return sb.toString();
+
+                case '\r':
+                    if (pos == buf.length)
+                        return sb.toString();
+
+                    c = (char)readByte();
+
+                    if (c == '\n')
+                        return sb.toString();
+                    else
+                        pos--;
+
+                    return sb.toString();
+
+                default:
+                    sb.a(c);
+            }
+        }
+
+        return sb.toString();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
index 3b5fa15..261daee 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 
 /**
  * Data input stream.
@@ -52,6 +53,7 @@ public class HadoopDataInStream extends InputStream implements DataInput {
     /**
      * @param size Size.
      * @return Old pointer.
+     * @throws IOException On error.
      */
     protected long move(long size) throws IOException {
         long ptr = buf.move(size);
@@ -156,7 +158,37 @@ public class HadoopDataInStream extends InputStream implements DataInput {
 
     /** {@inheritDoc} */
     @Override public String readLine() throws IOException {
-        throw new UnsupportedOperationException();
+        if (buf.remaining() == 0)
+            return null;
+
+        SB sb = new SB();
+
+        while (buf.remaining() > 0) {
+            char c = (char)readByte();
+
+            switch (c) {
+                case '\n':
+                    return sb.toString();
+
+                case '\r':
+                    if (buf.remaining() == 0)
+                        return sb.toString();
+
+                    c = (char)readByte();
+
+                    if (c == '\n')
+                        return sb.toString();
+                    else
+                        buf.moveBackward(1);
+
+                    return sb.toString();
+
+                default:
+                    sb.a(c);
+            }
+        }
+
+        return sb.toString();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
index acc9be6..d15e7eb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
@@ -106,6 +106,24 @@ public class HadoopOffheapBuffer {
     }
 
     /**
+     * @param size Size move on.
+     * @return Old position pointer or {@code 0} if move goes beyond the begin of the buffer.
+     */
+    public long moveBackward(long size) {
+        assert size > 0 : size;
+
+        long oldPos = posPtr;
+        long newPos = oldPos - size;
+
+        if (newPos < bufPtr)
+            return 0;
+
+        posPtr = newPos;
+
+        return oldPos;
+    }
+
+    /**
      * @param ptr Pointer.
      * @return {@code true} If the given pointer is inside of this buffer.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
index 612e892..c7d4dce 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
@@ -17,30 +17,173 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 
+import java.util.List;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutput;
 import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
 import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
 public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
+    private static final int BUFF_SIZE = 4 * 1024;
 
+    /**
+     * @throws IOException If failed.
+     */
     public void testStreams() throws IOException {
         GridUnsafeMemory mem = new GridUnsafeMemory(0);
 
         HadoopDataOutStream out = new HadoopDataOutStream(mem);
 
-        int size = 4 * 1024;
+        final long ptr = mem.allocate(BUFF_SIZE);
 
-        final long ptr = mem.allocate(size);
+        out.buffer().set(ptr, BUFF_SIZE);
 
-        out.buffer().set(ptr, size);
+        write(out);
 
+        HadoopDataInStream in = new HadoopDataInStream(mem);
+
+        in.buffer().set(ptr, out.buffer().pointer() - ptr);
+
+        checkRead(in);
+    }
+
+    /**
+     * @throws IOException If failed.
+     */
+    public void testDirectStreams() throws IOException {
+        HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
+
+        write(out);
+
+        byte [] inBuf = Arrays.copyOf(out.buffer(), out.position());
+
+        HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
+
+        checkRead(in);
+    }
+
+    /**
+     * @throws IOException If failed.
+     */
+    public void testReadline() throws IOException {
+        checkReadLine("String1\rString2\r\nString3\nString4");
+        checkReadLine("String1\rString2\r\nString3\nString4\r\n");
+        checkReadLine("String1\rString2\r\nString3\nString4\r");
+        checkReadLine("\nA\rB\r\nC\nD\n");
+        checkReadLine("\rA\rB\r\nC\nD\n");
+        checkReadLine("\r\nA\rB\r\nC\nD\n");
+        checkReadLine("\r\r\nA\r\r\nC\nD\n");
+        checkReadLine("\r\r\r\n\n\n");
+        checkReadLine("\r\n");
+        checkReadLine("\r");
+        checkReadLine("\n");
+    }
+
+    /**
+     * @param val String value.
+     * @throws IOException On error.
+     */
+    private void checkReadLine(String val) throws IOException {
+        List<String> expected = readLineByDataInputStream(val);
+        List<String> dataInp = readLineByHadoopDataInStream(val);
+        List<String> directDataInp = readLineByHadoopDirectDataInput(val);
+
+        assertEquals(expected, dataInp);
+        assertEquals(expected, directDataInp);
+    }
+
+    /**
+     * @param val String value.
+     * @return List of strings are returned by readLine().
+     * @throws IOException On error.
+     */
+    List<String> readLineByDataInputStream(String val) throws IOException {
+        ByteArrayOutputStream byteArrayOs = new ByteArrayOutputStream();
+
+        byteArrayOs.write(val.getBytes());
+
+        byteArrayOs.close();
+
+        try (DataInputStream in =  new DataInputStream(new ByteArrayInputStream(byteArrayOs.toByteArray()))) {
+            return readLineStrings(in);
+        }
+    }
+
+    /**
+     * @param val String value.
+     * @return List of strings are returned by readLine().
+     * @throws IOException On error.
+     */
+    List<String> readLineByHadoopDataInStream(String val) throws IOException {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+        HadoopDataOutStream out = new HadoopDataOutStream(mem);
+
+        final long ptr = mem.allocate(BUFF_SIZE);
+
+        out.buffer().set(ptr, BUFF_SIZE);
+
+        out.write(val.getBytes());
+
+        HadoopDataInStream in = new HadoopDataInStream(mem);
+
+        in.buffer().set(ptr, out.buffer().pointer() - ptr);
+
+        return readLineStrings(in);
+    }
+
+    /**
+     * @param val String value.
+     * @return List of strings are returned by readLine().
+     * @throws IOException On error.
+     */
+    List<String> readLineByHadoopDirectDataInput(String val) throws IOException {
+
+        HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
+
+        out.write(val.getBytes());
+
+        byte [] inBuf = Arrays.copyOf(out.buffer(), out.position());
+
+        HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
+
+        return readLineStrings(in);
+    }
+
+    /**
+     * @param in Data input.
+     * @return List of strings are returned by readLine().
+     * @throws IOException On error.
+     */
+    @NotNull private List<String> readLineStrings(DataInput in) throws IOException {
+        List<String> strs = new ArrayList<>();
+
+        for (String str = in.readLine(); str != null; str = in.readLine())
+            strs.add(str);
+
+        return strs;
+    }
+
+    /**
+     * @param out Data output.
+     * @throws IOException On error.
+     */
+    private void write(DataOutput out) throws IOException {
         out.writeBoolean(false);
         out.writeBoolean(true);
         out.writeBoolean(false);
@@ -84,20 +227,22 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
         out.writeLong(Long.MIN_VALUE);
         out.writeLong(0);
         out.writeLong(-1L);
-        out.write(new byte[]{1,2,3});
-        out.write(new byte[]{0,1,2,3}, 1, 2);
+        out.write(new byte[] {1, 2, 3});
+        out.write(new byte[] {0, 1, 2, 3}, 1, 2);
         out.writeUTF("mom washes rum");
+    }
 
-        HadoopDataInStream in = new HadoopDataInStream(mem);
-
-        in.buffer().set(ptr, out.buffer().pointer());
-
+    /**
+     * @param in Data input.
+     * @throws IOException On error.
+     */
+    private void checkRead(DataInput in) throws IOException {
         assertEquals(false, in.readBoolean());
         assertEquals(true, in.readBoolean());
         assertEquals(false, in.readBoolean());
-        assertEquals(17, in.read());
-        assertEquals(121, in.read());
-        assertEquals(0xfa, in.read());
+        assertEquals(17, in.readUnsignedByte());
+        assertEquals(121, in.readUnsignedByte());
+        assertEquals(0xfa, in.readUnsignedByte());
         assertEquals(17, in.readByte());
         assertEquals(121, in.readByte());
         assertEquals((byte)0xfa, in.readByte());
@@ -138,15 +283,15 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
 
         byte[] b = new byte[3];
 
-        in.read(b);
+        in.readFully(b);
 
-        assertTrue(Arrays.equals(new byte[]{1,2,3}, b));
+        assertTrue(Arrays.equals(new byte[] {1, 2, 3}, b));
 
         b = new byte[4];
 
-        in.read(b, 1, 2);
+        in.readFully(b, 1, 2);
 
-        assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b));
+        assertTrue(Arrays.equals(new byte[] {0, 1, 2, 0}, b));
 
         assertEquals("mom washes rum", in.readUTF());
     }