You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ch...@apache.org on 2015/12/25 16:29:23 UTC
tajo git commit: TAJO-1944: Support text resultset for REST
Repository: tajo
Updated Branches:
refs/heads/master 1f9ae1da0 -> 62cc6f686
TAJO-1944: Support text resultset for REST
Closes #876
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/62cc6f68
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/62cc6f68
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/62cc6f68
Branch: refs/heads/master
Commit: 62cc6f686021c91c85edcde00383297b711af248
Parents: 1f9ae1d
Author: charsyam <ch...@apache.org>
Authored: Sat Dec 26 00:27:29 2015 +0900
Committer: charsyam <ch...@apache.org>
Committed: Sat Dec 26 00:27:29 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../rs/resources/TestQueryResultResource.java | 133 ++++++++++++++++---
.../exec/NonForwardQueryResultFileScanner.java | 31 +++++
.../exec/NonForwardQueryResultScanner.java | 3 +
.../NonForwardQueryResultSystemScanner.java | 32 +++++
.../tajo/ws/rs/annotation/RestReturnType.java | 34 +++++
.../ws/rs/resources/QueryResultResource.java | 82 +++++-------
.../outputs/AbstractStreamingOutput.java | 43 ++++++
.../outputs/BinaryStreamingOutput.java | 102 ++++++++++++++
.../resources/outputs/CSVStreamingOutput.java | 129 ++++++++++++++++++
.../rs/resources/outputs/RestOutputFactory.java | 85 ++++++++++++
11 files changed, 611 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c02bfc7..fea896c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-1944: Support text resultset for REST (DaeMyung)
+
TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
TAJO-1858: Aligning error message in execute query page of web UI is needed.
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
index 2ab1add..26fa011 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
@@ -18,7 +18,6 @@
package org.apache.tajo.ws.rs.resources;
-import org.apache.commons.codec.binary.Base64;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -44,6 +43,7 @@ import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -52,7 +52,6 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.net.URI;
-import java.security.MessageDigest;
import java.util.List;
import static org.apache.tajo.exception.ErrorUtil.isOk;
@@ -65,7 +64,6 @@ public class TestQueryResultResource extends QueryTestCaseBase {
private Client restClient;
private static final String tajoSessionIdHeaderName = "X-Tajo-Session";
- private static final String tajoDigestHeaderName = "X-Tajo-Digest";
private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
private static final String tajoCountHeaderName = "X-Tajo-Count";
private static final String tajoEOSHeaderName = "X-Tajo-EOS";
@@ -176,7 +174,7 @@ public class TestQueryResultResource extends QueryTestCaseBase {
}
@Test
- public void testGetQueryResultSet() throws Exception {
+ public void testGetQueryResultSetWithBinary() throws Exception {
String sessionId = generateNewSessionAndGetId();
URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
URI queryResultURI = new URI(queryIdURI + "/result");
@@ -199,11 +197,10 @@ public class TestQueryResultResource extends QueryTestCaseBase {
Response queryResultSetResponse = restClient.target(queryResultSetURI)
.queryParam("count", 100)
.request().header(tajoSessionIdHeaderName, sessionId)
+ .header(HttpHeaders.ACCEPT, "application/octet-stream")
.get();
assertNotNull(queryResultSetResponse);
- String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
- assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
DataInputStream queryResultSetInputStream =
new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
@@ -213,7 +210,6 @@ public class TestQueryResultResource extends QueryTestCaseBase {
boolean isFinished = false;
List<Tuple> tupleList = TUtil.newList();
RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
while (!isFinished) {
try {
int length = queryResultSetInputStream.readInt();
@@ -223,14 +219,12 @@ public class TestQueryResultResource extends QueryTestCaseBase {
assertEquals(length, readBytes);
tupleList.add(decoder.toTuple(dataByteArray));
- messageDigest.update(dataByteArray);
} catch (EOFException eof) {
isFinished = true;
}
}
assertEquals(5, tupleList.size());
- assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));
for (Tuple aTuple: tupleList) {
assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
@@ -238,7 +232,7 @@ public class TestQueryResultResource extends QueryTestCaseBase {
}
@Test
- public void testGetQueryResultSetWithDefaultCount() throws Exception {
+ public void testGetQueryResultSetWithDefaultCountWithBinary() throws Exception {
String sessionId = generateNewSessionAndGetId();
URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
URI queryResultURI = new URI(queryIdURI + "/result");
@@ -260,19 +254,20 @@ public class TestQueryResultResource extends QueryTestCaseBase {
Response queryResultSetResponse = restClient.target(queryResultSetURI)
.request().header(tajoSessionIdHeaderName, sessionId)
+ .header(HttpHeaders.ACCEPT, "application/octet-stream")
.get();
assertNotNull(queryResultSetResponse);
- String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+ int contentLength = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
- assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
assertTrue(eos);
assertEquals(0, offset);
assertEquals(5, count);
+
DataInputStream queryResultSetInputStream =
new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
@@ -280,28 +275,136 @@ public class TestQueryResultResource extends QueryTestCaseBase {
boolean isFinished = false;
List<Tuple> tupleList = TUtil.newList();
+ int receviedSize = 0;
RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
while (!isFinished) {
try {
int length = queryResultSetInputStream.readInt();
+ receviedSize += (length + 4);
byte[] dataByteArray = new byte[length];
int readBytes = queryResultSetInputStream.read(dataByteArray);
assertEquals(length, readBytes);
tupleList.add(decoder.toTuple(dataByteArray));
- messageDigest.update(dataByteArray);
} catch (EOFException eof) {
isFinished = true;
}
}
+ assertEquals(contentLength, receviedSize);
assertEquals(5, tupleList.size());
- assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));
for (Tuple aTuple: tupleList) {
assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
}
}
+
+ @Test
+ public void testGetQueryResultSetWithCSV() throws Exception {
+ String sessionId = generateNewSessionAndGetId();
+ URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
+ URI queryResultURI = new URI(queryIdURI + "/result");
+
+ GetQueryResultDataResponse response = restClient.target(queryResultURI)
+ .request().header(tajoSessionIdHeaderName, sessionId)
+ .get(new GenericType<>(GetQueryResultDataResponse.class));
+
+ assertNotNull(response);
+ assertNotNull(response.getResultCode());
+ assertTrue(isOk(response.getResultCode()));
+ assertNotNull(response.getSchema());
+ assertEquals(16, response.getSchema().getRootColumns().size());
+ assertNotNull(response.getResultset());
+ assertTrue(response.getResultset().getId() != 0);
+ assertNotNull(response.getResultset().getLink());
+
+ URI queryResultSetURI = response.getResultset().getLink();
+
+ Response queryResultSetResponse = restClient.target(queryResultSetURI)
+ .request().header(tajoSessionIdHeaderName, sessionId)
+ .header(HttpHeaders.ACCEPT, "text/csv")
+ .get();
+
+ assertNotNull(queryResultSetResponse);
+ int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
+ int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
+ boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+ int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
+
+ assertTrue(eos);
+ assertEquals(0, offset);
+ assertEquals(5, count);
+ assertTrue(length > 0);
+
+ DataInputStream queryResultSetInputStream =
+ new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
+
+ assertNotNull(queryResultSetInputStream);
+
+ try {
+ byte[] dataByteArray = new byte[length];
+ int readBytes = queryResultSetInputStream.read(dataByteArray);
+
+ assertEquals(length, readBytes);
+
+ } catch (EOFException eof) {
+ }
+
+ assertEquals(5, count);
+ }
+
+ @Test
+ public void testGetQueryResultSetWithDefaultOutputType() throws Exception {
+ String sessionId = generateNewSessionAndGetId();
+ URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
+ URI queryResultURI = new URI(queryIdURI + "/result");
+
+ GetQueryResultDataResponse response = restClient.target(queryResultURI)
+ .request().header(tajoSessionIdHeaderName, sessionId)
+ .get(new GenericType<>(GetQueryResultDataResponse.class));
+
+ assertNotNull(response);
+ assertNotNull(response.getResultCode());
+ assertTrue(isOk(response.getResultCode()));
+ assertNotNull(response.getSchema());
+ assertEquals(16, response.getSchema().getRootColumns().size());
+ assertNotNull(response.getResultset());
+ assertTrue(response.getResultset().getId() != 0);
+ assertNotNull(response.getResultset().getLink());
+
+ URI queryResultSetURI = response.getResultset().getLink();
+
+ Response queryResultSetResponse = restClient.target(queryResultSetURI)
+ .request().header(tajoSessionIdHeaderName, sessionId)
+ .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
+ .get();
+
+ assertNotNull(queryResultSetResponse);
+ int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
+ int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
+ boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+ int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH));
+
+ assertTrue(eos);
+ assertEquals(0, offset);
+ assertEquals(5, count);
+ assertTrue(length > 0);
+
+ DataInputStream queryResultSetInputStream =
+ new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
+
+ assertNotNull(queryResultSetInputStream);
+
+ try {
+ byte[] dataByteArray = new byte[length];
+ int readBytes = queryResultSetInputStream.read(dataByteArray);
+
+ assertEquals(length, readBytes);
+
+ } catch (EOFException eof) {
+ }
+
+ assertEquals(5, count);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 8953315..a1728ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -179,6 +179,37 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
));
}
+ public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException {
+ List<Tuple> rows = new ArrayList<>();
+ if (scanExec == null) {
+ return rows;
+ }
+ int rowCount = 0;
+ while (!eof) {
+ Tuple tuple = scanExec.next();
+ if (tuple == null) {
+ eof = true;
+ break;
+ }
+
+ rows.add(tuple);
+ rowCount++;
+ currentNumRows++;
+ if (rowCount >= fetchRowNum) {
+ break;
+ }
+ if (currentNumRows >= maxRow) {
+ eof = true;
+ break;
+ }
+ }
+
+ if(eof) {
+ close();
+ }
+ return rows;
+ }
+
public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
List<ByteString> rows = new ArrayList<>();
if (scanExec == null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
index fab3a1f..e4f3475 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+import org.apache.tajo.storage.Tuple;
import java.io.IOException;
import java.util.List;
@@ -37,6 +38,8 @@ public interface NonForwardQueryResultScanner {
@Deprecated
List<ByteString> getNextRows(int fetchRowNum) throws IOException;
+ List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException;
+
SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException;
QueryId getQueryId();
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 7f6db9b..7f505a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -622,6 +622,38 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
}
@Override
+ public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException {
+ List<Tuple> rows = new ArrayList<>();
+ int startRow = currentRow;
+ int endRow = startRow + fetchRowNum;
+
+ if (physicalExec == null) {
+ return rows;
+ }
+
+ while (currentRow < endRow) {
+ Tuple currentTuple = physicalExec.next();
+
+ if (currentTuple == null) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+
+ currentRow++;
+ rows.add(currentTuple);
+
+ if (currentRow >= maxRow) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+ }
+
+ return rows;
+ }
+
+ @Override
public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException {
int rowCount = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
new file mode 100644
index 0000000..e37cfae
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Definition of the return type of Rest API.
+ * According to the output type, Rest APIs return their results in text or binary.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface RestReturnType {
+ String mimeType();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
index 93d397a..7d5c78a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
@@ -36,6 +36,8 @@ import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.ws.rs.*;
+import org.apache.tajo.ws.rs.resources.outputs.AbstractStreamingOutput;
+import org.apache.tajo.ws.rs.resources.outputs.RestOutputFactory;
import org.apache.tajo.ws.rs.responses.GetQueryResultDataResponse;
import org.apache.tajo.ws.rs.responses.ResultSetInfoResponse;
@@ -69,6 +71,7 @@ public class QueryResultResource {
private static final String cacheIdKeyName = "cacheId";
private static final String offsetKeyName = "offset";
private static final String countKeyName = "count";
+ private static final String acceptTypeKeyName = "accept";
private static final String tajoDigestHeaderName = "X-Tajo-Digest";
private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
@@ -256,8 +259,8 @@ public class QueryResultResource {
@GET
@Path("{cacheId}")
- @Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId,
+ @HeaderParam(HttpHeaders.ACCEPT) String acceptType,
@PathParam("cacheId") String cacheId,
@DefaultValue("100") @QueryParam("count") int count) {
if (LOG.isDebugEnabled()) {
@@ -278,11 +281,16 @@ public class QueryResultResource {
context.put(sessionIdKey, sessionId);
JerseyResourceDelegateContextKey<Long> cacheIdKey =
JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
+
context.put(cacheIdKey, Long.valueOf(cacheId));
JerseyResourceDelegateContextKey<Integer> countKey =
JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
context.put(countKey, count);
-
+
+ JerseyResourceDelegateContextKey<String> acceptTypeKey =
+ JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
+ context.put(acceptTypeKey, acceptType);
+
response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
new GetQueryResultSetDelegate(),
application,
@@ -307,6 +315,9 @@ public class QueryResultResource {
JerseyResourceDelegateContextKey<String> queryIdKey =
JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class);
String queryId = context.get(queryIdKey);
+ JerseyResourceDelegateContextKey<String> acceptTypeKey =
+ JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
+ String acceptType = context.get(acceptTypeKey);
JerseyResourceDelegateContextKey<Long> cacheIdKey =
JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
Long cacheId = context.get(cacheIdKey);
@@ -345,59 +356,30 @@ public class QueryResultResource {
clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue());
try {
- int start_offset = cachedQueryResultScanner.getCurrentRowNumber();
- List<ByteString> output = cachedQueryResultScanner.getNextRows(count);
- String digestString = getEncodedBase64DigestString(output);
- boolean eos = count != output.size();
-
- return Response.ok(new QueryResultStreamingOutput(output))
- .header(tajoDigestHeaderName, digestString)
- .header(tajoOffsetHeaderName, start_offset)
- .header(tajoCountHeaderName, output.size())
- .header(tajoEOSHeaderName, eos)
- .build();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
-
- return ResourcesUtil.createExceptionResponse(null, e.getMessage());
- } catch (NoSuchAlgorithmException e) {
- LOG.error(e.getMessage(), e);
-
- return ResourcesUtil.createExceptionResponse(null, e.getMessage());
- }
- }
-
- private String getEncodedBase64DigestString(List<ByteString> outputList) throws NoSuchAlgorithmException {
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
-
- for (ByteString byteString: outputList) {
- messageDigest.update(byteString.toByteArray());
- }
-
- return Base64.encodeBase64String(messageDigest.digest());
- }
- }
-
- private static class QueryResultStreamingOutput implements StreamingOutput {
+ int startOffset = cachedQueryResultScanner.getCurrentRowNumber();
+ AbstractStreamingOutput restOutput = RestOutputFactory.get(acceptType, cachedQueryResultScanner, count, startOffset);
+ if (restOutput == null) {
+ return ResourcesUtil.createExceptionResponse(null, acceptType);
+ }
- private final List<ByteString> outputList;
+ int size = restOutput.count();
+ boolean eos = count != size;
- public QueryResultStreamingOutput(List<ByteString> outputList) {
- this.outputList = outputList;
- }
+ Response.ResponseBuilder builder = Response.ok(restOutput)
+ .header(tajoOffsetHeaderName, startOffset)
+ .header(tajoCountHeaderName, size)
+ .header(tajoEOSHeaderName, eos)
+ .header(HttpHeaders.CONTENT_TYPE, restOutput.contentType());
- @Override
- public void write(OutputStream outputStream) throws IOException, WebApplicationException {
- DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+ if (restOutput.hasLength()) {
+ builder.header(HttpHeaders.CONTENT_LENGTH, restOutput.length());
+ }
- for (ByteString byteString: outputList) {
- byte[] byteStringArray = byteString.toByteArray();
- streamingOutputStream.writeInt(byteStringArray.length);
- streamingOutputStream.write(byteStringArray);
+ return builder.build();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ return ResourcesUtil.createExceptionResponse(null, e.getMessage());
}
-
- streamingOutputStream.flush();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
new file mode 100644
index 0000000..f1e0eb5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.StreamingOutput;
+
+public abstract class AbstractStreamingOutput implements StreamingOutput {
+ protected NonForwardQueryResultScanner scanner;
+ protected int count;
+ protected int startOffset;
+
+ public AbstractStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startoffset) {
+ this.scanner = scanner;
+ this.count = count;
+ this.startOffset = startoffset;
+ }
+
+ public abstract boolean hasLength();
+ public abstract int count();
+ public abstract int length();
+
+ public String contentType() {
+ return MediaType.APPLICATION_OCTET_STREAM;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
new file mode 100644
index 0000000..13afc7b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import com.google.protobuf.ByteString;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import javax.ws.rs.WebApplicationException;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@RestReturnType(
+ mimeType = "application/octet-stream"
+)
+public class BinaryStreamingOutput extends AbstractStreamingOutput {
+ private List<byte[]> byteOutputLists = null;
+ private int length = -1;
+
+ public BinaryStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) throws IOException {
+ super(scanner, count, startOffset);
+ }
+
+ @Override
+ public boolean hasLength() {
+ return true;
+ }
+
+ private void fetch() {
+ if (length == -1) {
+ length = fill();
+ }
+ }
+
+ @Override
+ public int length() {
+ fetch();
+ return length;
+ }
+
+ @Override
+ public int count() {
+ try {
+ fetch();
+ return byteOutputLists.size();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ @Override
+ public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+ fetch();
+
+ DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+
+ for (byte[] bytes: byteOutputLists) {
+ streamingOutputStream.writeInt(bytes.length);
+ streamingOutputStream.write(bytes);
+ }
+
+ streamingOutputStream.flush();
+ }
+
+ private int fill() {
+ int tmpLen = 0;
+ try {
+ byteOutputLists = new ArrayList<byte[]>();
+
+ List<ByteString> outputList = scanner.getNextRows(count);
+ for (ByteString byteString : outputList) {
+ byte[] byteStringArray = byteString.toByteArray();
+ byteOutputLists.add(byteStringArray);
+ tmpLen += 4;
+ tmpLen += byteStringArray.length;
+ }
+ } catch (IOException e) {
+ }
+
+ return tmpLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
new file mode 100644
index 0000000..8b2d3ab
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+@RestReturnType(
+ mimeType = "text/csv"
+)
+public class CSVStreamingOutput extends AbstractStreamingOutput {
+ private String output;
+ private boolean alreadyCalculated = false;
+ private int size = 0;
+
+ public CSVStreamingOutput(NonForwardQueryResultScanner cachedQueryResultScanner, Integer count, Integer startOffset) throws IOException {
+ super(cachedQueryResultScanner, count, startOffset);
+ }
+
+ @Override
+ public boolean hasLength() {
+ return true;
+ }
+
+ @Override
+ public int length() {
+ try {
+ fetch();
+ return output.length();
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ @Override
+ public int count() {
+ try {
+ fetch();
+ return size;
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ private void fetch() throws IOException {
+ if (output != null) {
+ return;
+ }
+
+ List<Tuple> outputTupletList = scanner.getNextTupleRows(count);
+ size = outputTupletList.size();
+
+ StringBuilder sb = new StringBuilder();
+ if (startOffset == 0) {
+ Schema schema = this.scanner.getLogicalSchema();
+ List<Column> columns = schema.getAllColumns();
+ boolean first = true;
+ for (Column column : columns) {
+ if (!first) {
+ sb.append(",");
+ }
+
+ sb.append(StringEscapeUtils.escapeCsv(column.getSimpleName()));
+ first = false;
+ }
+
+ sb.append("\r\n");
+ }
+
+ for (Tuple tuple : outputTupletList) {
+ Datum[] datums = tuple.getValues();
+ int size = datums.length;
+
+ for (int i = 0; i < size; i++) {
+ if (i != 0) {
+ sb.append(",");
+ }
+ Datum datum = datums[i];
+ sb.append(StringEscapeUtils.escapeCsv(datum.toString()));
+ }
+
+ sb.append("\r\n");
+ }
+
+ output = sb.toString();
+ }
+
+ @Override
+ public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+ fetch();
+ DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
+ streamingOutputStream.write(output.getBytes("utf-8"));
+ streamingOutputStream.flush();
+ }
+
+ @Override
+ public String contentType() {
+ return MediaType.APPLICATION_JSON;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
new file mode 100644
index 0000000..90dd301
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources.outputs;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.util.ClassUtil;
+import org.apache.tajo.ws.rs.annotation.RestReturnType;
+
+import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.Set;
+
+public class RestOutputFactory {
+ private static Log LOG = LogFactory.getLog(RestOutputFactory.class);
+ private static Map<String, String> restOutputClasses = load();
+
+ private static Map<String, String> load() {
+ Map<String, String> outputClasses = Maps.newHashMap();
+ Set<Class> restOutputClasses = ClassUtil.findClasses(AbstractStreamingOutput.class, "org.apache.tajo.ws.rs.resources.outputs");
+
+ for (Class eachClass : restOutputClasses) {
+ if (eachClass.isInterface() ||
+ Modifier.isAbstract(eachClass.getModifiers())) {
+ continue;
+ }
+
+ AbstractStreamingOutput streamingOutput = null;
+ try {
+ streamingOutput = (AbstractStreamingOutput) eachClass.getDeclaredConstructor(
+ new Class[]{NonForwardQueryResultScanner.class, Integer.class, Integer.class}).newInstance(null, 0, 0);
+ } catch (Exception e) {
+ LOG.warn(eachClass + " cannot instantiate Function class because of " + e.getMessage(), e);
+ continue;
+ }
+ String className = streamingOutput.getClass().getCanonicalName();
+ String headerType = streamingOutput.getClass().getAnnotation(RestReturnType.class).mimeType();
+
+ if (StringUtils.isNotEmpty(headerType)) {
+ outputClasses.put(headerType, className);
+ }
+ }
+
+ return outputClasses;
+ }
+
+ public static AbstractStreamingOutput get(String mimeType, NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) {
+ AbstractStreamingOutput output = null;
+ try {
+ if (restOutputClasses.containsKey(mimeType)) {
+ String className = (String) restOutputClasses.get(mimeType);
+ Class<?> clazz = Class.forName(className);
+ output = (AbstractStreamingOutput) clazz.getDeclaredConstructor(
+ new Class[]{NonForwardQueryResultScanner.class,
+ Integer.class, Integer.class})
+ .newInstance(scanner, count, startOffset);
+ } else {
+ output = new CSVStreamingOutput(scanner, count, startOffset);
+ }
+ } catch (Exception eh) {
+ LOG.error(eh);
+ }
+
+ return output;
+ }
+}