You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 11:44:05 UTC

arrow git commit: ARROW-506: Java: Implement echo server for integration testing.

Repository: arrow
Updated Branches:
  refs/heads/master 69cdbd8ce -> c327b5fd2


ARROW-506: Java: Implement echo server for integration testing.

While implementing this, it became clear it made sense for the stream writer to
have an API to indicate EOS without closing the stream. The current message the
reader is expecting is a 4 byte size for the next batch. This patch proposes we
allow 0 as the size to indicate EOS.

Author: Nong Li <no...@gmail.com>

Closes #295 from nongli/echo_server and squashes the following commits:

c115b02 [Nong Li] Add license header.
a3a50ca [Nong Li] ARROW-506: Java: Implement echo server for integration testing.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c327b5fd
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c327b5fd
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c327b5fd

Branch: refs/heads/master
Commit: c327b5fd2c35788c90b3fef2bc7b5faf89c07e49
Parents: 69cdbd8
Author: Nong Li <no...@gmail.com>
Authored: Mon Jan 23 06:43:59 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 06:43:59 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/arrow/tools/EchoServer.java | 130 +++++++++++++++++++
 .../org/apache/arrow/tools/EchoServerTest.java  | 129 ++++++++++++++++++
 .../arrow/vector/stream/ArrowStreamWriter.java  |  14 +-
 .../arrow/vector/stream/MessageSerializer.java  |   7 +-
 .../apache/arrow/vector/file/TestArrowFile.java |   4 +-
 .../arrow/vector/stream/TestArrowStream.java    |   4 +-
 .../vector/stream/TestArrowStreamPipe.java      |   2 +-
 7 files changed, 278 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
new file mode 100644
index 0000000..c00620e
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class EchoServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);
+
+  private boolean closed = false;
+  private final ServerSocket serverSocket;
+
+  public EchoServer(int port) throws IOException {
+    LOGGER.info("Starting echo server.");
+    serverSocket = new ServerSocket(port);
+    LOGGER.info("Running echo server on port: " + port());
+  }
+
+  public int port() { return serverSocket.getLocalPort(); }
+
+  public static class ClientConnection implements AutoCloseable {
+    public final Socket socket;
+    public ClientConnection(Socket socket) {
+      this.socket = socket;
+    }
+
+    public void run() throws IOException {
+      BufferAllocator  allocator = new RootAllocator(Long.MAX_VALUE);
+      List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
+      try (
+        InputStream in = socket.getInputStream();
+        OutputStream out = socket.getOutputStream();
+        ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
+      ) {
+        // Read the entire input stream.
+        reader.init();
+        while (true) {
+          ArrowRecordBatch batch = reader.nextRecordBatch();
+          if (batch == null) break;
+          batches.add(batch);
+        }
+        LOGGER.info(String.format("Received %d batches", batches.size()));
+
+        // Write it back
+        try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) {
+          for (ArrowRecordBatch batch: batches) {
+            writer.writeRecordBatch(batch);
+          }
+          writer.end();
+          Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
+        }
+        LOGGER.info("Done writing stream back.");
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      socket.close();
+    }
+  }
+
+  public void run() throws IOException {
+    try {
+      while (!closed) {
+        LOGGER.info("Waiting to accept new client connection.");
+        Socket clientSocket = serverSocket.accept();
+        LOGGER.info("Accepted new client connection.");
+        try (ClientConnection client = new ClientConnection(clientSocket)) {
+          try {
+            client.run();
+          } catch (IOException e) {
+            LOGGER.warn("Error handling client connection.", e);
+          }
+        }
+        LOGGER.info("Closed connection with client");
+      }
+    } catch (java.net.SocketException ex) {
+      if (!closed) throw ex;
+    } finally {
+      serverSocket.close();
+      LOGGER.info("Server closed.");
+    }
+  }
+
+  public void close() throws IOException {
+    closed = true;
+    serverSocket.close();
+  }
+
+  public static void main(String[] args) throws Exception {
+    int port;
+    if (args.length > 0) {
+      port = Integer.parseInt(args[0]);
+    } else {
+      port = 8080;
+    }
+    new EchoServer(port).run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
new file mode 100644
index 0000000..48d6162
--- /dev/null
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.tools;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class EchoServerTest {
+  public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) {
+    ArrowBuf buffer = alloc.buffer(bytes.length);
+    buffer.writeBytes(bytes);
+    return buffer;
+  }
+
+  public static byte[] array(ArrowBuf buf) {
+    byte[] bytes = new byte[buf.readableBytes()];
+    buf.readBytes(bytes);
+    return bytes;
+  }
+
+  private void testEchoServer(int serverPort, Schema schema, List<ArrowRecordBatch> batches)
+      throws UnknownHostException, IOException {
+    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    try (Socket socket = new Socket("localhost", serverPort);
+        ArrowStreamWriter writer = new ArrowStreamWriter(socket.getOutputStream(), schema);
+        ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) {
+      for (ArrowRecordBatch batch: batches) {
+        writer.writeRecordBatch(batch);
+      }
+      writer.end();
+
+      reader.init();
+      assertEquals(schema, reader.getSchema());
+      for (int i = 0; i < batches.size(); i++) {
+        ArrowRecordBatch result = reader.nextRecordBatch();
+        ArrowRecordBatch expected = batches.get(i);
+        assertTrue(result != null);
+        assertEquals(expected.getBuffers().size(), result.getBuffers().size());
+        for (int j = 0; j < expected.getBuffers().size(); j++) {
+          assertTrue(expected.getBuffers().get(j).compareTo(result.getBuffers().get(j)) == 0);
+        }
+      }
+      ArrowRecordBatch result = reader.nextRecordBatch();
+      assertTrue(result == null);
+      assertEquals(reader.bytesRead(), writer.bytesWritten());
+    }
+  }
+
+  @Test
+  public void basicTest() throws InterruptedException, IOException {
+    final EchoServer server = new EchoServer(0);
+    int serverPort = server.port();
+    Thread serverThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          server.run();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    serverThread.start();
+
+    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+    byte[] validity = new byte[] { (byte)255, 0};
+    byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+    ArrowBuf validityb = buf(alloc, validity);
+    ArrowBuf valuesb =  buf(alloc, values);
+    ArrowRecordBatch batch = new ArrowRecordBatch(
+        16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
+
+    Schema schema = new Schema(asList(new Field(
+        "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+
+    // Try an empty stream, just the header.
+    testEchoServer(serverPort, schema, new ArrayList<ArrowRecordBatch>());
+
+    // Try with one batch.
+    List<ArrowRecordBatch> batches = new ArrayList<>();
+    batches.add(batch);
+    testEchoServer(serverPort, schema, batches);
+
+    // Try with a few
+    for (int i = 0; i < 10; i++) {
+      batches.add(batch);
+    }
+    testEchoServer(serverPort, schema, batches);
+
+    server.close();
+    serverThread.join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
index 06acf9f..60dc586 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
@@ -35,14 +35,14 @@ public class ArrowStreamWriter implements AutoCloseable {
    * Creates the stream writer. non-blocking.
    * totalBatches can be set if the writer knows beforehand. Can be -1 if unknown.
    */
-  public ArrowStreamWriter(WritableByteChannel out, Schema schema, int totalBatches) {
+  public ArrowStreamWriter(WritableByteChannel out, Schema schema) {
     this.out = new WriteChannel(out);
     this.schema = schema;
   }
 
-  public ArrowStreamWriter(OutputStream out, Schema schema, int totalBatches)
+  public ArrowStreamWriter(OutputStream out, Schema schema)
       throws IOException {
-    this(Channels.newChannel(out), schema, totalBatches);
+    this(Channels.newChannel(out), schema);
   }
 
   public long bytesWritten() { return out.getCurrentPosition(); }
@@ -53,6 +53,14 @@ public class ArrowStreamWriter implements AutoCloseable {
     MessageSerializer.serialize(out, batch);
   }
 
+  /**
+   * End the stream. This is not required and this object can simply be closed.
+   */
+  public void end() throws IOException {
+    checkAndSendHeader();
+    out.writeIntLittleEndian(0);
+  }
+
   @Override
   public void close() throws IOException {
     // The header might not have been sent if this is an empty stream. Send it even in

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 6e22dbd..7ab740c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -235,11 +235,10 @@ public class MessageSerializer {
   private static Message deserializeMessage(ReadChannel in, byte headerType) throws IOException {
     // Read the message size. There is an i32 little endian prefix.
     ByteBuffer buffer = ByteBuffer.allocate(4);
-    if (in.readFully(buffer) != 4) {
-      return null;
-    }
-
+    if (in.readFully(buffer) != 4) return null;
     int messageLength = bytesToInt(buffer.array());
+    if (messageLength == 0) return null;
+
     buffer = ByteBuffer.allocate(messageLength);
     if (in.readFully(buffer) != messageLength) {
       throw new IOException(

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index 9b99144..a83a283 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -232,7 +232,7 @@ public class TestArrowFile extends BaseFileTest {
       Schema schema = vectorUnloader0.getSchema();
       Assert.assertEquals(2, schema.getFields().size());
       try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
-          ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema, 2)) {
+          ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema)) {
         try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
           Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
           arrowWriter.writeRecordBatch(recordBatch);
@@ -399,7 +399,7 @@ public class TestArrowFile extends BaseFileTest {
     // Also try serializing to the stream writer.
     if (outStream != null) {
       try (
-          ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema, -1);
+          ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema);
           ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
           ) {
         arrowWriter.writeRecordBatch(recordBatch);

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
index ba1cdae..725272a 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
@@ -42,7 +42,7 @@ public class TestArrowStream extends BaseFileTest {
 
     // Write the stream.
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema, -1)) {
+    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
     }
 
     ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
@@ -66,7 +66,7 @@ public class TestArrowStream extends BaseFileTest {
     BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     long bytesWritten = 0;
-    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema, numBatches)) {
+    try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema)) {
       ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
       ArrowBuf valuesb =  MessageSerializerTest.buf(alloc, values);
       for (int i = 0; i < numBatches; i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/c327b5fd/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
index e187fa5..a0a7ffa 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
@@ -47,7 +47,7 @@ public class TestArrowStreamPipe {
     public WriterThread(int numBatches, WritableByteChannel sinkChannel)
         throws IOException {
       this.numBatches = numBatches;
-      writer = new ArrowStreamWriter(sinkChannel, schema, -1);
+      writer = new ArrowStreamWriter(sinkChannel, schema);
     }
 
     @Override