You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ch...@apache.org on 2015/12/25 16:29:23 UTC

tajo git commit: TAJO-1944: Support text resultset for REST

Repository: tajo
Updated Branches:
  refs/heads/master 1f9ae1da0 -> 62cc6f686


TAJO-1944: Support text resultset for REST

Closes #876


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/62cc6f68
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/62cc6f68
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/62cc6f68

Branch: refs/heads/master
Commit: 62cc6f686021c91c85edcde00383297b711af248
Parents: 1f9ae1d
Author: charsyam <ch...@apache.org>
Authored: Sat Dec 26 00:27:29 2015 +0900
Committer: charsyam <ch...@apache.org>
Committed: Sat Dec 26 00:27:29 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../rs/resources/TestQueryResultResource.java   | 133 ++++++++++++++++---
 .../exec/NonForwardQueryResultFileScanner.java  |  31 +++++
 .../exec/NonForwardQueryResultScanner.java      |   3 +
 .../NonForwardQueryResultSystemScanner.java     |  32 +++++
 .../tajo/ws/rs/annotation/RestReturnType.java   |  34 +++++
 .../ws/rs/resources/QueryResultResource.java    |  82 +++++-------
 .../outputs/AbstractStreamingOutput.java        |  43 ++++++
 .../outputs/BinaryStreamingOutput.java          | 102 ++++++++++++++
 .../resources/outputs/CSVStreamingOutput.java   | 129 ++++++++++++++++++
 .../rs/resources/outputs/RestOutputFactory.java |  85 ++++++++++++
 11 files changed, 611 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c02bfc7..fea896c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1944: Support text resultset for REST (DaeMyung)
+
     TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
 
     TAJO-1858: Aligning error message in execute query page of web UI is needed.

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
index 2ab1add..26fa011 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.ws.rs.resources;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -44,6 +43,7 @@ import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -52,7 +52,6 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.InputStream;
 import java.net.URI;
-import java.security.MessageDigest;
 import java.util.List;
 
 import static org.apache.tajo.exception.ErrorUtil.isOk;
@@ -65,7 +64,6 @@ public class TestQueryResultResource extends QueryTestCaseBase {
   private Client restClient;
 
   private static final String tajoSessionIdHeaderName = "X-Tajo-Session";
-  private static final String tajoDigestHeaderName = "X-Tajo-Digest";
 	private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
 	private static final String tajoCountHeaderName = "X-Tajo-Count";
 	private static final String tajoEOSHeaderName = "X-Tajo-EOS";
@@ -176,7 +174,7 @@ public class TestQueryResultResource extends QueryTestCaseBase {
   }
 
   @Test
-  public void testGetQueryResultSet() throws Exception {
+  public void testGetQueryResultSetWithBinary() throws Exception {
     String sessionId = generateNewSessionAndGetId();
     URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
     URI queryResultURI = new URI(queryIdURI + "/result");
@@ -199,11 +197,10 @@ public class TestQueryResultResource extends QueryTestCaseBase {
     Response queryResultSetResponse = restClient.target(queryResultSetURI)
         .queryParam("count", 100)
         .request().header(tajoSessionIdHeaderName, sessionId)
+        .header(HttpHeaders.ACCEPT, "application/octet-stream")
         .get();
 
     assertNotNull(queryResultSetResponse);
-    String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
-    assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
 
     DataInputStream queryResultSetInputStream =
         new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
@@ -213,7 +210,6 @@ public class TestQueryResultResource extends QueryTestCaseBase {
     boolean isFinished = false;
     List<Tuple> tupleList = TUtil.newList();
     RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
-    MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
     while (!isFinished) {
       try {
         int length = queryResultSetInputStream.readInt();
@@ -223,14 +219,12 @@ public class TestQueryResultResource extends QueryTestCaseBase {
         assertEquals(length, readBytes);
 
         tupleList.add(decoder.toTuple(dataByteArray));
-        messageDigest.update(dataByteArray);
       } catch (EOFException eof) {
         isFinished = true;
       }
     }
 
     assertEquals(5, tupleList.size());
-    assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));
 
     for (Tuple aTuple: tupleList) {
       assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
@@ -238,7 +232,7 @@ public class TestQueryResultResource extends QueryTestCaseBase {
   }
 
   @Test
-  public void testGetQueryResultSetWithDefaultCount() throws Exception {
+  public void testGetQueryResultSetWithDefaultCountWithBinary() throws Exception {
     String sessionId = generateNewSessionAndGetId();
     URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
     URI queryResultURI = new URI(queryIdURI + "/result");
@@ -260,19 +254,20 @@ public class TestQueryResultResource extends QueryTestCaseBase {
 
     Response queryResultSetResponse = restClient.target(queryResultSetURI)
         .request().header(tajoSessionIdHeaderName, sessionId)
+        .header(HttpHeaders.ACCEPT, "application/octet-stream")
         .get();
 
     assertNotNull(queryResultSetResponse);
-    String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
     int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
     int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
     boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+    int contentLength = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
 
-    assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
     assertTrue(eos);
     assertEquals(0, offset);
     assertEquals(5, count);
 
+
     DataInputStream queryResultSetInputStream =
         new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
 
@@ -280,28 +275,136 @@ public class TestQueryResultResource extends QueryTestCaseBase {
 
     boolean isFinished = false;
     List<Tuple> tupleList = TUtil.newList();
+    int receviedSize = 0;
     RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
-    MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
     while (!isFinished) {
       try {
         int length = queryResultSetInputStream.readInt();
+        receviedSize += (length + 4);
         byte[] dataByteArray = new byte[length];
         int readBytes = queryResultSetInputStream.read(dataByteArray);
 
         assertEquals(length, readBytes);
 
         tupleList.add(decoder.toTuple(dataByteArray));
-        messageDigest.update(dataByteArray);
       } catch (EOFException eof) {
         isFinished = true;
       }
     }
 
+    assertEquals(contentLength, receviedSize);
     assertEquals(5, tupleList.size());
-    assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));
 
     for (Tuple aTuple: tupleList) {
       assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
     }
   }
+
+  @Test
+  public void testGetQueryResultSetWithCSV() throws Exception {
+    String sessionId = generateNewSessionAndGetId();
+    URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
+    URI queryResultURI = new URI(queryIdURI + "/result");
+
+    GetQueryResultDataResponse response = restClient.target(queryResultURI)
+            .request().header(tajoSessionIdHeaderName, sessionId)
+            .get(new GenericType<>(GetQueryResultDataResponse.class));
+
+    assertNotNull(response);
+    assertNotNull(response.getResultCode());
+    assertTrue(isOk(response.getResultCode()));
+    assertNotNull(response.getSchema());
+    assertEquals(16, response.getSchema().getRootColumns().size());
+    assertNotNull(response.getResultset());
+    assertTrue(response.getResultset().getId() != 0);
+    assertNotNull(response.getResultset().getLink());
+
+    URI queryResultSetURI = response.getResultset().getLink();
+
+    Response queryResultSetResponse = restClient.target(queryResultSetURI)
+            .request().header(tajoSessionIdHeaderName, sessionId)
+            .header(HttpHeaders.ACCEPT, "text/csv")
+            .get();
+
+    assertNotNull(queryResultSetResponse);
+    int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
+    int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
+    boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+    int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
+
+    assertTrue(eos);
+    assertEquals(0, offset);
+    assertEquals(5, count);
+    assertTrue(length > 0);
+
+    DataInputStream queryResultSetInputStream =
+            new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
+
+    assertNotNull(queryResultSetInputStream);
+
+    try {
+      byte[] dataByteArray = new byte[length];
+      int readBytes = queryResultSetInputStream.read(dataByteArray);
+
+      assertEquals(length, readBytes);
+
+    } catch (EOFException eof) {
+    }
+
+    assertEquals(5, count);
+  }
+
+  @Test
+  public void testGetQueryResultSetWithDefaultOutputType() throws Exception {
+    String sessionId = generateNewSessionAndGetId();
+    URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
+    URI queryResultURI = new URI(queryIdURI + "/result");
+
+    GetQueryResultDataResponse response = restClient.target(queryResultURI)
+            .request().header(tajoSessionIdHeaderName, sessionId)
+            .get(new GenericType<>(GetQueryResultDataResponse.class));
+
+    assertNotNull(response);
+    assertNotNull(response.getResultCode());
+    assertTrue(isOk(response.getResultCode()));
+    assertNotNull(response.getSchema());
+    assertEquals(16, response.getSchema().getRootColumns().size());
+    assertNotNull(response.getResultset());
+    assertTrue(response.getResultset().getId() != 0);
+    assertNotNull(response.getResultset().getLink());
+
+    URI queryResultSetURI = response.getResultset().getLink();
+
+    Response queryResultSetResponse = restClient.target(queryResultSetURI)
+            .request().header(tajoSessionIdHeaderName, sessionId)
+            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
+            .get();
+
+    assertNotNull(queryResultSetResponse);
+    int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
+    int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
+    boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+    int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
+
+    assertTrue(eos);
+    assertEquals(0, offset);
+    assertEquals(5, count);
+    assertTrue(length > 0);
+
+    DataInputStream queryResultSetInputStream =
+            new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
+
+    assertNotNull(queryResultSetInputStream);
+
+    try {
+      byte[] dataByteArray = new byte[length];
+      int readBytes = queryResultSetInputStream.read(dataByteArray);
+
+      assertEquals(length, readBytes);
+
+    } catch (EOFException eof) {
+    }
+
+    assertEquals(5, count);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 8953315..a1728ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -179,6 +179,37 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
     ));
   }
 
+  public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException {
+    List<Tuple> rows = new ArrayList<>();
+    if (scanExec == null) {
+      return rows;
+    }
+    int rowCount = 0;
+    while (!eof) {
+      Tuple tuple = scanExec.next();
+      if (tuple == null) {
+        eof = true;
+        break;
+      }
+
+      rows.add(tuple);
+      rowCount++;
+      currentNumRows++;
+      if (rowCount >= fetchRowNum) {
+        break;
+      }
+      if (currentNumRows >= maxRow) {
+        eof = true;
+        break;
+      }
+    }
+
+    if(eof) {
+      close();
+    }
+    return rows;
+  }
+
   public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
     List<ByteString> rows = new ArrayList<>();
     if (scanExec == null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
index fab3a1f..e4f3475 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
 import java.util.List;
@@ -37,6 +38,8 @@ public interface NonForwardQueryResultScanner {
   @Deprecated
   List<ByteString> getNextRows(int fetchRowNum) throws IOException;
 
+  List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException;
+
   SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException;
 
   QueryId getQueryId();

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 7f6db9b..7f505a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -622,6 +622,38 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
   }
 
   @Override
+  public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException {
+    List<Tuple> rows = new ArrayList<>();
+    int startRow = currentRow;
+    int endRow = startRow + fetchRowNum;
+
+    if (physicalExec == null) {
+      return rows;
+    }
+
+    while (currentRow < endRow) {
+      Tuple currentTuple = physicalExec.next();
+
+      if (currentTuple == null) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+
+      currentRow++;
+      rows.add(currentTuple);
+
+      if (currentRow >= maxRow) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+    }
+
+    return rows;
+  }
+
+  @Override
   public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException {
     int rowCount = 0;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
new file mode 100644
index 0000000..e37cfae
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Definition of the return type of Rest API.
+ * According to the output type, Rest APIs return their results in text or binary.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface RestReturnType {
+    String mimeType();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
index 93d397a..7d5c78a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
@@ -36,6 +36,8 @@ import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.ws.rs.*;
+import org.apache.tajo.ws.rs.resources.outputs.AbstractStreamingOutput;
+import org.apache.tajo.ws.rs.resources.outputs.RestOutputFactory;
 import org.apache.tajo.ws.rs.responses.GetQueryResultDataResponse;
 import org.apache.tajo.ws.rs.responses.ResultSetInfoResponse;
 
@@ -69,6 +71,7 @@ public class QueryResultResource {
   private static final String cacheIdKeyName = "cacheId";
   private static final String offsetKeyName = "offset";
   private static final String countKeyName = "count";
+  private static final String acceptTypeKeyName = "accept";
 
   private static final String tajoDigestHeaderName = "X-Tajo-Digest";
   private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
@@ -256,8 +259,8 @@ public class QueryResultResource {
 
   @GET
   @Path("{cacheId}")
-  @Produces(MediaType.APPLICATION_OCTET_STREAM)
   public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId,
+      @HeaderParam(HttpHeaders.ACCEPT) String acceptType,
       @PathParam("cacheId") String cacheId,
       @DefaultValue("100") @QueryParam("count") int count) {
     if (LOG.isDebugEnabled()) {
@@ -278,11 +281,16 @@ public class QueryResultResource {
       context.put(sessionIdKey, sessionId);
       JerseyResourceDelegateContextKey<Long> cacheIdKey =
           JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
+
       context.put(cacheIdKey, Long.valueOf(cacheId));
       JerseyResourceDelegateContextKey<Integer> countKey =
           JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
       context.put(countKey, count);
-      
+
+      JerseyResourceDelegateContextKey<String> acceptTypeKey =
+              JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
+      context.put(acceptTypeKey, acceptType);
+
       response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
           new GetQueryResultSetDelegate(),
           application,
@@ -307,6 +315,9 @@ public class QueryResultResource {
       JerseyResourceDelegateContextKey<String> queryIdKey =
           JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class);
       String queryId = context.get(queryIdKey);
+      JerseyResourceDelegateContextKey<String> acceptTypeKey =
+              JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
+      String acceptType = context.get(acceptTypeKey);
       JerseyResourceDelegateContextKey<Long> cacheIdKey =
           JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
       Long cacheId = context.get(cacheIdKey);
@@ -345,59 +356,30 @@ public class QueryResultResource {
           clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue());
 
       try {
-        int start_offset = cachedQueryResultScanner.getCurrentRowNumber();
-        List<ByteString> output = cachedQueryResultScanner.getNextRows(count);
-        String digestString = getEncodedBase64DigestString(output);
-        boolean eos = count != output.size();
-
-        return Response.ok(new QueryResultStreamingOutput(output))
-          .header(tajoDigestHeaderName, digestString)
-          .header(tajoOffsetHeaderName, start_offset)
-          .header(tajoCountHeaderName, output.size())
-          .header(tajoEOSHeaderName, eos)
-          .build();
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-
-        return ResourcesUtil.createExceptionResponse(null, e.getMessage());
-      } catch (NoSuchAlgorithmException e) {
-        LOG.error(e.getMessage(), e);
-
-        return ResourcesUtil.createExceptionResponse(null, e.getMessage());
-      }
-    }
-
-    private String getEncodedBase64DigestString(List<ByteString> outputList) throws NoSuchAlgorithmException {
-      MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
-
-      for (ByteString byteString: outputList) {
-        messageDigest.update(byteString.toByteArray());
-      }
-
-      return Base64.encodeBase64String(messageDigest.digest());
-    }
-  }
-
-  private static class QueryResultStreamingOutput implements StreamingOutput {
+        int startOffset = cachedQueryResultScanner.getCurrentRowNumber();
+        AbstractStreamingOutput restOutput = RestOutputFactory.get(acceptType, cachedQueryResultScanner, count, startOffset);
+        if (restOutput == null) {
+          return ResourcesUtil.createExceptionResponse(null, acceptType);
+        }
 
-    private final List<ByteString> outputList;
+        int size = restOutput.count();
+        boolean eos = count != size;
 
-    public QueryResultStreamingOutput(List<ByteString> outputList) {
-      this.outputList = outputList;
-    }
+        Response.ResponseBuilder builder = Response.ok(restOutput)
+          .header(tajoOffsetHeaderName, startOffset)
+          .header(tajoCountHeaderName, size)
+          .header(tajoEOSHeaderName, eos)
+          .header(HttpHeaders.CONTENT_TYPE, restOutput.contentType());
 
-    @Override
-    public void write(OutputStream outputStream) throws IOException, WebApplicationException {
-      DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+        if (restOutput.hasLength()) {
+          builder.header(HttpHeaders.CONTENT_LENGTH, restOutput.length());
+        }
 
-      for (ByteString byteString: outputList) {
-        byte[] byteStringArray = byteString.toByteArray();
-        streamingOutputStream.writeInt(byteStringArray.length);
-        streamingOutputStream.write(byteStringArray);
+        return builder.build();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        return ResourcesUtil.createExceptionResponse(null, e.getMessage());
       }
-
-      streamingOutputStream.flush();
     }
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
new file mode 100644
index 0000000..f1e0eb5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.StreamingOutput;
+
+public abstract class AbstractStreamingOutput implements StreamingOutput {
+  protected NonForwardQueryResultScanner scanner;
+  protected int count;
+  protected int startOffset;
+
+  public AbstractStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startoffset) {
+    this.scanner = scanner;
+    this.count = count;
+    this.startOffset = startoffset;
+  }
+
+  public abstract boolean hasLength();
+  public abstract int count();
+  public abstract int length();
+
+  public String contentType() {
+    return MediaType.APPLICATION_OCTET_STREAM;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
new file mode 100644
index 0000000..13afc7b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import com.google.protobuf.ByteString;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import javax.ws.rs.WebApplicationException;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@RestReturnType(
+  mimeType = "application/octet-stream"
+)
+public class BinaryStreamingOutput extends AbstractStreamingOutput {
+  private List<byte[]> byteOutputLists = null;
+  private int length = -1;
+
+  public BinaryStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) throws IOException {
+    super(scanner, count, startOffset);
+  }
+
+  @Override
+  public boolean hasLength() {
+    return true;
+  }
+
+  private void fetch() {
+    if (length == -1) {
+      length = fill();
+    }
+  }
+
+  @Override
+  public int length() {
+    fetch();
+    return length;
+  }
+
+  @Override
+  public int count() {
+    try {
+      fetch();
+      return byteOutputLists.size();
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  @Override
+  public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+    fetch();
+
+    DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+
+    for (byte[] bytes: byteOutputLists) {
+      streamingOutputStream.writeInt(bytes.length);
+      streamingOutputStream.write(bytes);
+    }
+
+    streamingOutputStream.flush();
+  }
+
+  private int fill() {
+    int tmpLen = 0;
+    try {
+      byteOutputLists = new ArrayList<byte[]>();
+
+      List<ByteString> outputList = scanner.getNextRows(count);
+      for (ByteString byteString : outputList) {
+        byte[] byteStringArray = byteString.toByteArray();
+        byteOutputLists.add(byteStringArray);
+        tmpLen += 4;
+        tmpLen += byteStringArray.length;
+      }
+    } catch (IOException e) {
+    }
+
+    return tmpLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
new file mode 100644
index 0000000..8b2d3ab
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+@RestReturnType(
+  mimeType = "text/csv"
+)
+public class CSVStreamingOutput extends AbstractStreamingOutput {
+  private String output;
+  private boolean alreadyCalculated = false;
+  private int size = 0;
+
+  public CSVStreamingOutput(NonForwardQueryResultScanner cachedQueryResultScanner, Integer count, Integer startOffset) throws IOException {
+    super(cachedQueryResultScanner, count, startOffset);
+  }
+
+  @Override
+  public boolean hasLength() {
+    return true;
+  }
+
+  @Override
+  public int length() {
+    try {
+      fetch();
+      return output.length();
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  @Override
+  public int count() {
+    try {
+      fetch();
+      return size;
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  private void fetch() throws IOException {
+    if (output != null) {
+      return;
+    }
+
+    List<Tuple> outputTupletList = scanner.getNextTupleRows(count);
+    size = outputTupletList.size();
+
+    StringBuilder sb = new StringBuilder();
+    if (startOffset == 0) {
+      Schema schema = this.scanner.getLogicalSchema();
+      List<Column> columns = schema.getAllColumns();
+      boolean first = true;
+      for (Column column : columns) {
+        if (!first) {
+          sb.append(",");
+        }
+
+        sb.append(StringEscapeUtils.escapeCsv(column.getSimpleName()));
+        first = false;
+      }
+
+      sb.append("\r\n");
+    }
+
+    for (Tuple tuple : outputTupletList) {
+      Datum[] datums = tuple.getValues();
+      int size = datums.length;
+
+      for (int i = 0; i < size; i++) {
+        if (i != 0) {
+          sb.append(",");
+        }
+        Datum datum = datums[i];
+        sb.append(StringEscapeUtils.escapeCsv(datum.toString()));
+      }
+
+      sb.append("\r\n");
+    }
+
+    output = sb.toString();
+  }
+
+  @Override
+  public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+    fetch();
+    DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+    streamingOutputStream.write(output.getBytes("utf-8"));
+    streamingOutputStream.flush();
+  }
+
+  @Override
+  public String contentType() {
+    return MediaType.APPLICATION_JSON;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
new file mode 100644
index 0000000..90dd301
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.util.ClassUtil;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.Set;
+
+public class RestOutputFactory {
+  private static Log LOG = LogFactory.getLog(RestOutputFactory.class);
+  private static Map<String, String> restOutputClasses = load();
+
+  private static Map<String, String> load() {
+    Map<String, String> outputClasses = Maps.newHashMap();
+    Set<Class> restOutputClasses = ClassUtil.findClasses(AbstractStreamingOutput.class, "org.apache.tajo.ws.rs.resources.outputs");
+
+    for (Class eachClass : restOutputClasses) {
+      if (eachClass.isInterface() || 
+          Modifier.isAbstract(eachClass.getModifiers())) {
+        continue;
+      }
+
+      AbstractStreamingOutput streamingOutput = null;
+      try {
+        streamingOutput = (AbstractStreamingOutput) eachClass.getDeclaredConstructor(
+        new Class[]{NonForwardQueryResultScanner.class, Integer.class, Integer.class}).newInstance(null, 0, 0);
+      } catch (Exception e) {
+        LOG.warn(eachClass + " cannot instantiate Function class because of " + e.getMessage(), e);
+        continue;
+      }
+      String className = streamingOutput.getClass().getCanonicalName();
+      String headerType = streamingOutput.getClass().getAnnotation(RestReturnType.class).mimeType();
+
+      if (StringUtils.isNotEmpty(headerType)) {
+        outputClasses.put(headerType, className);
+      }
+    }
+
+    return outputClasses;
+  }
+
+  public static AbstractStreamingOutput get(String mimeType, NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) {
+    AbstractStreamingOutput output = null;
+    try {
+      if (restOutputClasses.containsKey(mimeType)) {
+        String className = (String) restOutputClasses.get(mimeType);
+        Class<?> clazz = Class.forName(className);
+        output = (AbstractStreamingOutput) clazz.getDeclaredConstructor(
+                  new Class[]{NonForwardQueryResultScanner.class,
+                  Integer.class, Integer.class})
+                  .newInstance(scanner, count, startOffset);
+      } else {
+        output = new CSVStreamingOutput(scanner, count, startOffset);
+      }
+    } catch (Exception eh) {
+        LOG.error(eh);
+    }
+
+    return output;
+  }
+}