You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/28 11:09:48 UTC
ignite git commit: IGNITE-4405: Hadoop: implemented "readLine" method
for HadoopDataInStream and HadoopDirectDataInput classes. This closes #1358.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 7d82d6a06 -> a61b0eaff
IGNITE-4405: Hadoop: implemented "readLine" method for HadoopDataInStream and HadoopDirectDataInput classes. This closes #1358.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a61b0eaf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a61b0eaf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a61b0eaf
Branch: refs/heads/ignite-2.0
Commit: a61b0eaff1817d84c0659e8a7e095f29e22800e1
Parents: 7d82d6a
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Wed Dec 28 14:09:38 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 28 14:09:38 2016 +0300
----------------------------------------------------------------------
.../shuffle/direct/HadoopDirectDataInput.java | 34 +++-
.../shuffle/streams/HadoopDataInStream.java | 34 +++-
.../shuffle/streams/HadoopOffheapBuffer.java | 18 ++
.../streams/HadoopDataStreamSelfTest.java | 177 +++++++++++++++++--
4 files changed, 244 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..6f0e2b0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.jetbrains.annotations.NotNull;
import java.io.DataInput;
@@ -150,8 +151,37 @@ public class HadoopDirectDataInput extends InputStream implements DataInput {
/** {@inheritDoc} */
@Override public String readLine() throws IOException {
- // TODO: Create ticket!
- throw new UnsupportedOperationException();
+ if (pos == buf.length)
+ return null;
+
+ SB sb = new SB();
+
+ while (pos < buf.length) {
+ char c = (char)readByte();
+
+ switch (c) {
+ case '\n':
+ return sb.toString();
+
+ case '\r':
+ if (pos == buf.length)
+ return sb.toString();
+
+ c = (char)readByte();
+
+ if (c == '\n')
+ return sb.toString();
+ else
+ pos--;
+
+ return sb.toString();
+
+ default:
+ sb.a(c);
+ }
+ }
+
+ return sb.toString();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
index 3b5fa15..261daee 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.SB;
/**
* Data input stream.
@@ -52,6 +53,7 @@ public class HadoopDataInStream extends InputStream implements DataInput {
/**
* @param size Size.
* @return Old pointer.
+ * @throws IOException On error.
*/
protected long move(long size) throws IOException {
long ptr = buf.move(size);
@@ -156,7 +158,37 @@ public class HadoopDataInStream extends InputStream implements DataInput {
/** {@inheritDoc} */
@Override public String readLine() throws IOException {
- throw new UnsupportedOperationException();
+ if (buf.remaining() == 0)
+ return null;
+
+ SB sb = new SB();
+
+ while (buf.remaining() > 0) {
+ char c = (char)readByte();
+
+ switch (c) {
+ case '\n':
+ return sb.toString();
+
+ case '\r':
+ if (buf.remaining() == 0)
+ return sb.toString();
+
+ c = (char)readByte();
+
+ if (c == '\n')
+ return sb.toString();
+ else
+ buf.moveBackward(1);
+
+ return sb.toString();
+
+ default:
+ sb.a(c);
+ }
+ }
+
+ return sb.toString();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
index acc9be6..d15e7eb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
@@ -106,6 +106,24 @@ public class HadoopOffheapBuffer {
}
/**
+ * @param size Size move on.
+ * @return Old position pointer or {@code 0} if move goes beyond the begin of the buffer.
+ */
+ public long moveBackward(long size) {
+ assert size > 0 : size;
+
+ long oldPos = posPtr;
+ long newPos = oldPos - size;
+
+ if (newPos < bufPtr)
+ return 0;
+
+ posPtr = newPos;
+
+ return oldPos;
+ }
+
+ /**
* @param ptr Pointer.
* @return {@code true} If the given pointer is inside of this buffer.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a61b0eaf/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
index 612e892..c7d4dce 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
@@ -17,30 +17,173 @@
package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutput;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
/**
*
*/
public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
+ private static final int BUFF_SIZE = 4 * 1024;
+ /**
+ * @throws IOException If failed.
+ */
public void testStreams() throws IOException {
GridUnsafeMemory mem = new GridUnsafeMemory(0);
HadoopDataOutStream out = new HadoopDataOutStream(mem);
- int size = 4 * 1024;
+ final long ptr = mem.allocate(BUFF_SIZE);
- final long ptr = mem.allocate(size);
+ out.buffer().set(ptr, BUFF_SIZE);
- out.buffer().set(ptr, size);
+ write(out);
+ HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ in.buffer().set(ptr, out.buffer().pointer() - ptr);
+
+ checkRead(in);
+ }
+
+ /**
+ * @throws IOException If failed.
+ */
+ public void testDirectStreams() throws IOException {
+ HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
+
+ write(out);
+
+ byte [] inBuf = Arrays.copyOf(out.buffer(), out.position());
+
+ HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
+
+ checkRead(in);
+ }
+
+ /**
+ * @throws IOException If failed.
+ */
+ public void testReadline() throws IOException {
+ checkReadLine("String1\rString2\r\nString3\nString4");
+ checkReadLine("String1\rString2\r\nString3\nString4\r\n");
+ checkReadLine("String1\rString2\r\nString3\nString4\r");
+ checkReadLine("\nA\rB\r\nC\nD\n");
+ checkReadLine("\rA\rB\r\nC\nD\n");
+ checkReadLine("\r\nA\rB\r\nC\nD\n");
+ checkReadLine("\r\r\nA\r\r\nC\nD\n");
+ checkReadLine("\r\r\r\n\n\n");
+ checkReadLine("\r\n");
+ checkReadLine("\r");
+ checkReadLine("\n");
+ }
+
+ /**
+ * @param val String value.
+ * @throws IOException On error.
+ */
+ private void checkReadLine(String val) throws IOException {
+ List<String> expected = readLineByDataInputStream(val);
+ List<String> dataInp = readLineByHadoopDataInStream(val);
+ List<String> directDataInp = readLineByHadoopDirectDataInput(val);
+
+ assertEquals(expected, dataInp);
+ assertEquals(expected, directDataInp);
+ }
+
+ /**
+ * @param val String value.
+ * @return List of strings are returned by readLine().
+ * @throws IOException On error.
+ */
+ List<String> readLineByDataInputStream(String val) throws IOException {
+ ByteArrayOutputStream byteArrayOs = new ByteArrayOutputStream();
+
+ byteArrayOs.write(val.getBytes());
+
+ byteArrayOs.close();
+
+ try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(byteArrayOs.toByteArray()))) {
+ return readLineStrings(in);
+ }
+ }
+
+ /**
+ * @param val String value.
+ * @return List of strings are returned by readLine().
+ * @throws IOException On error.
+ */
+ List<String> readLineByHadoopDataInStream(String val) throws IOException {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ HadoopDataOutStream out = new HadoopDataOutStream(mem);
+
+ final long ptr = mem.allocate(BUFF_SIZE);
+
+ out.buffer().set(ptr, BUFF_SIZE);
+
+ out.write(val.getBytes());
+
+ HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ in.buffer().set(ptr, out.buffer().pointer() - ptr);
+
+ return readLineStrings(in);
+ }
+
+ /**
+ * @param val String value.
+ * @return List of strings are returned by readLine().
+ * @throws IOException On error.
+ */
+ List<String> readLineByHadoopDirectDataInput(String val) throws IOException {
+
+ HadoopDirectDataOutput out = new HadoopDirectDataOutput(BUFF_SIZE);
+
+ out.write(val.getBytes());
+
+ byte [] inBuf = Arrays.copyOf(out.buffer(), out.position());
+
+ HadoopDirectDataInput in = new HadoopDirectDataInput(inBuf);
+
+ return readLineStrings(in);
+ }
+
+ /**
+ * @param in Data input.
+ * @return List of strings are returned by readLine().
+ * @throws IOException On error.
+ */
+ @NotNull private List<String> readLineStrings(DataInput in) throws IOException {
+ List<String> strs = new ArrayList<>();
+
+ for (String str = in.readLine(); str != null; str = in.readLine())
+ strs.add(str);
+
+ return strs;
+ }
+
+ /**
+ * @param out Data output.
+ * @throws IOException On error.
+ */
+ private void write(DataOutput out) throws IOException {
out.writeBoolean(false);
out.writeBoolean(true);
out.writeBoolean(false);
@@ -84,20 +227,22 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
out.writeLong(Long.MIN_VALUE);
out.writeLong(0);
out.writeLong(-1L);
- out.write(new byte[]{1,2,3});
- out.write(new byte[]{0,1,2,3}, 1, 2);
+ out.write(new byte[] {1, 2, 3});
+ out.write(new byte[] {0, 1, 2, 3}, 1, 2);
out.writeUTF("mom washes rum");
+ }
- HadoopDataInStream in = new HadoopDataInStream(mem);
-
- in.buffer().set(ptr, out.buffer().pointer());
-
+ /**
+ * @param in Data input.
+ * @throws IOException On error.
+ */
+ private void checkRead(DataInput in) throws IOException {
assertEquals(false, in.readBoolean());
assertEquals(true, in.readBoolean());
assertEquals(false, in.readBoolean());
- assertEquals(17, in.read());
- assertEquals(121, in.read());
- assertEquals(0xfa, in.read());
+ assertEquals(17, in.readUnsignedByte());
+ assertEquals(121, in.readUnsignedByte());
+ assertEquals(0xfa, in.readUnsignedByte());
assertEquals(17, in.readByte());
assertEquals(121, in.readByte());
assertEquals((byte)0xfa, in.readByte());
@@ -138,15 +283,15 @@ public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
byte[] b = new byte[3];
- in.read(b);
+ in.readFully(b);
- assertTrue(Arrays.equals(new byte[]{1,2,3}, b));
+ assertTrue(Arrays.equals(new byte[] {1, 2, 3}, b));
b = new byte[4];
- in.read(b, 1, 2);
+ in.readFully(b, 1, 2);
- assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b));
+ assertTrue(Arrays.equals(new byte[] {0, 1, 2, 0}, b));
assertEquals("mom washes rum", in.readUTF());
}