You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/29 08:46:48 UTC
[1/5] tajo git commit: TAJO-1661: Implement CORR function. (jihoon)
Repository: tajo
Updated Branches:
refs/heads/index_support 17dfe86cc -> 806469a26
TAJO-1661: Implement CORR function. (jihoon)
Closes #616
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa49dc4a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa49dc4a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa49dc4a
Branch: refs/heads/index_support
Commit: aa49dc4a8af9f836e44896c516d8b0cdb738e5dd
Parents: 2ec307d
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jun 25 17:16:41 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jun 25 17:16:41 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/engine/function/builtin/Corr.java | 224 +++++++++++++++++++
tajo-core/src/main/proto/InternalTypes.proto | 9 +
.../engine/function/TestBuiltinFunctions.java | 33 +++
4 files changed, 268 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 425ac5d..9412179 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.11.0 - unreleased
NEW FEATURES
+ TAJO-1661: Implement CORR function. (jihoon)
+
TAJO-1537: Implement a virtual table for sessions.
(Contributed by Yongjin Choi, Committed by hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
new file mode 100644
index 0000000..310169f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Corr.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.InternalTypes.CorrProto;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Compute the Pearson correlation coefficient corr(x, y), using the following
+ * stable one-pass method, based on:
+ * "Formulas for Robust, One-Pass Parallel Computation of Covariances and
+ * Arbitrary-Order Statistical Moments", Philippe Pebay, Sandia Labs
+ * and "The Art of Computer Programming, volume 2: Seminumerical Algorithms",
+ * Donald Knuth.
+ *
+ * Incremental:
+ * n : <count>
+ * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+ * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+ * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+ * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ *
+ * Merge:
+ * c_(A,B) = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *
+ */
+@Description(
+ functionName = "corr",
+ example = "> SELECT corr(expr, expr);",
+ description = "Returns the Pearson coefficient of correlation between a set of number pairs.\n" +
+ "The function takes as arguments any pair of numeric types and returns a double.\n"
+ + "Any pair with a NULL is ignored. If the function is applied to an empty set or\n"
+ + "a singleton set, NULL will be returned. Otherwise, it computes the following:\n"
+ + " COVAR_POP(x,y)/(STDDEV_POP(x)*STDDEV_POP(y))\n"
+ + "where neither x nor y is null,\n"
+ + "COVAR_POP is the population covariance,\n"
+ + "and STDDEV_POP is the population standard deviation.",
+ returnType = Type.FLOAT8,
+ paramTypes = {
+ @ParamTypes(paramTypes = {Type.INT8, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.INT8, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.INT4, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.INT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT8}),
+ @ParamTypes(paramTypes = {Type.FLOAT8, Type.FLOAT4}),
+ @ParamTypes(paramTypes = {Type.FLOAT4, Type.FLOAT4}),
+ }
+)
+public class Corr extends AggFunction<Datum> {
+
+ /**
+ * Evaluate the Pearson correlation coefficient using a stable one-pass
+ * algorithm, based on work by Philippe Pébay and Donald Knuth.
+ *
+ * Incremental:
+ * n : <count>
+ * mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg>
+ * c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n - my_n) : <covariance * n>
+ * vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)): <variance * n>
+ * vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ *
+ * Merge:
+ * c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X
+ * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ *
+ */
+ public Corr() {
+ super(new Column[] {
+ new Column("expr", Type.FLOAT8),
+ new Column("expr", Type.FLOAT8)
+ });
+ }
+
+ public Corr(Column[] definedArgs) {
+ super(definedArgs);
+ }
+
+ @Override
+ public FunctionContext newContext() {
+ return new CorrContext();
+ }
+
+ @Override
+ public void eval(FunctionContext ctx, Tuple params) {
+ if (!params.isBlankOrNull(0) && !params.isBlankOrNull(1)) {
+ CorrContext corrContext = (CorrContext) ctx;
+ double vx = params.getFloat8(0);
+ double vy = params.getFloat8(1);
+ double deltaX = vx - corrContext.xavg;
+ double deltaY = vy - corrContext.yavg;
+ corrContext.count++;
+ corrContext.xavg += deltaX / corrContext.count;
+ corrContext.yavg += deltaY / corrContext.count;
+ if (corrContext.count > 1) {
+ corrContext.covar += deltaX * (vy - corrContext.yavg);
+ corrContext.xvar += deltaX * (vx - corrContext.xavg);
+ corrContext.yvar += deltaY * (vy - corrContext.yavg);
+ }
+ }
+ }
+
+ @Override
+ public void merge(FunctionContext ctx, Tuple part) {
+ CorrContext corrContext = (CorrContext) ctx;
+ if (part.isBlankOrNull(0)) {
+ return;
+ }
+ ProtobufDatum datum = (ProtobufDatum) part.getProtobufDatum(0);
+ CorrProto proto = (CorrProto) datum.get();
+ long nA = corrContext.count;
+ long nB = proto.getCount();
+
+ if (nA == 0) {
+ corrContext.count = proto.getCount();
+ corrContext.xavg = proto.getXavg();
+ corrContext.yavg = proto.getYavg();
+ corrContext.xvar = proto.getXvar();
+ corrContext.yvar = proto.getYvar();
+ corrContext.covar = proto.getCovar();
+ } else {
+ // Merge the two partials
+ double xavgA = corrContext.xavg;
+ double yavgA = corrContext.yavg;
+ double xavgB = proto.getXavg();
+ double yavgB = proto.getYavg();
+ double xvarB = proto.getXvar();
+ double yvarB = proto.getYvar();
+ double covarB = proto.getCovar();
+
+ corrContext.count += nB;
+ corrContext.xavg = (xavgA * nA + xavgB * nB) / corrContext.count;
+ corrContext.yavg = (yavgA * nA + yavgB * nB) / corrContext.count;
+ corrContext.xvar += xvarB + (xavgA - xavgB) * (xavgA - xavgB) * nA * nB / corrContext.count;
+ corrContext.yvar += yvarB + (yavgA - yavgB) * (yavgA - yavgB) * nA * nB / corrContext.count;
+ corrContext.covar +=
+ covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / corrContext.count);
+ }
+ }
+
+ @Override
+ public Datum getPartialResult(FunctionContext ctx) {
+ CorrContext corrContext = (CorrContext) ctx;
+ if (corrContext.count == 0) {
+ return NullDatum.get();
+ }
+ CorrProto.Builder builder = CorrProto.newBuilder();
+ builder.setCount(corrContext.count);
+ builder.setXavg(corrContext.xavg);
+ builder.setYavg(corrContext.yavg);
+ builder.setXvar(corrContext.xvar);
+ builder.setYvar(corrContext.yvar);
+ builder.setCovar(corrContext.covar);
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public DataType getPartialResultType() {
+ return CatalogUtil.newDataType(Type.PROTOBUF, CorrProto.class.getName());
+ }
+
+ @Override
+ public Datum terminate(FunctionContext ctx) {
+ CorrContext corrContext = (CorrContext) ctx;
+
+ if (corrContext.count < 2) { // SQL standard - return null for zero or one pair
+ return NullDatum.get();
+ } else {
+ return DatumFactory.createFloat8(corrContext.covar
+ / java.lang.Math.sqrt(corrContext.xvar)
+ / java.lang.Math.sqrt(corrContext.yvar));
+ }
+ }
+
+ protected static class CorrContext implements FunctionContext {
+ long count = 0; // number n of elements
+ double xavg = 0; // average of x elements
+ double yavg = 0; // average of y elements
+ double xvar = 0; // n times the variance of x elements
+ double yvar = 0; // n times the variance of y elements
+ double covar = 0; // n times the covariance
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto
index 13dd107..d23e244 100644
--- a/tajo-core/src/main/proto/InternalTypes.proto
+++ b/tajo-core/src/main/proto/InternalTypes.proto
@@ -36,3 +36,12 @@ message VarianceProto {
required double avg = 2;
required int64 count = 3;
}
+
+message CorrProto {
+ required int64 count = 1;
+ required double xavg = 2;
+ required double yavg = 3;
+ required double xvar = 4;
+ required double yvar = 5;
+ required double covar = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/aa49dc4a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
index 5dae452..72fdd6f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestBuiltinFunctions.java
@@ -788,4 +788,37 @@ public class TestBuiltinFunctions extends QueryTestCaseBase {
executeString("DROP TABLE rank_table2 PURGE");
}
}
+
+ @Test
+ public void testCorr() throws Exception {
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("id", TajoDataTypes.Type.INT4);
+ schema.addColumn("value_int", TajoDataTypes.Type.INT4);
+ schema.addColumn("value_long", TajoDataTypes.Type.INT8);
+ schema.addColumn("value_float", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("value_double", TajoDataTypes.Type.FLOAT8);
+ String[] data = new String[]{
+ "1|\\N|-111|1.2|-50.5",
+ "2|1|\\N|\\N|52.5",
+ "3|2|-333|2.8|\\N",
+ "4|3|-555|2.8|43.2",
+ "5|4|-111|1.1|10.2",};
+ TajoTestingCluster.createTable("testbuiltin11", schema, tableOptions, data, 1);
+
+ try {
+ ResultSet res = executeString("select corr(value_int, value_long) as corr1, corr(value_long, value_float) as corr2, corr(value_float, value_double) as corr3, corr(value_double, value_int) as corr4 from testbuiltin11");
+ String ascExpected = "corr1,corr2,corr3,corr4\n" +
+ "-------------------------------\n" +
+ "0.5,-0.9037045658322675,0.7350290063698216,-0.8761489936497805\n";
+
+ assertEquals(ascExpected, resultSetToString(res));
+ res.close();
+ } finally {
+ executeString("DROP TABLE testbuiltin11 PURGE");
+ }
+ }
}
[4/5] tajo git commit: TAJO-1638: Remove offset parameter from rest
api result/{cacheId}.
Posted by ji...@apache.org.
TAJO-1638: Remove offset parameter from rest api result/{cacheId}.
Closes #599
Signed-off-by: JaeHwa Jung <bl...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/773c3eb9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/773c3eb9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/773c3eb9
Branch: refs/heads/index_support
Commit: 773c3eb94ad3c3a6bc3b8db7692a9a38a5f3e8af
Parents: 8f3215d
Author: DaeMyung Kang <ch...@naver.com>
Authored: Mon Jun 29 11:12:17 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Mon Jun 29 11:12:17 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../ws/rs/resources/QueryResultResource.java | 35 +++-------
.../rs/resources/TestQueryResultResource.java | 73 +++-----------------
3 files changed, 23 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/773c3eb9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 04c4392..90ef54a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1638: Remove offset parameter from rest api result/{cacheId}.
+ (Contributed by DaeMyung Kang, Committed by jaehwa)
+
TAJO-1659: Simplify scan iteration in SeqScan. (hyunsik)
TAJO-751: JDBC driver should support cancel() method.
http://git-wip-us.apache.org/repos/asf/tajo/blob/773c3eb9/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
index 3384c90..2f52198 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java
@@ -69,6 +69,9 @@ public class QueryResultResource {
private static final String countKeyName = "count";
private static final String tajoDigestHeaderName = "X-Tajo-Digest";
+ private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
+ private static final String tajoCountHeaderName = "X-Tajo-Count";
+ private static final String tajoEOSHeaderName = "X-Tajo-EOS";
public UriInfo getUriInfo() {
return uriInfo;
@@ -242,7 +245,6 @@ public class QueryResultResource {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId,
@PathParam("cacheId") String cacheId,
- @DefaultValue("-1") @QueryParam("offset") int offset,
@DefaultValue("100") @QueryParam("count") int count) {
if (LOG.isDebugEnabled()) {
LOG.debug("Client sent a get query result set request.");
@@ -257,9 +259,6 @@ public class QueryResultResource {
JerseyResourceDelegateContextKey<Long> cacheIdKey =
JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
context.put(cacheIdKey, Long.valueOf(cacheId));
- JerseyResourceDelegateContextKey<Integer> offsetKey =
- JerseyResourceDelegateContextKey.valueOf(offsetKeyName, Integer.class);
- context.put(offsetKey, offset);
JerseyResourceDelegateContextKey<Integer> countKey =
JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
context.put(countKey, count);
@@ -294,9 +293,6 @@ public class QueryResultResource {
JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.ClientApplicationKey, ClientApplication.class);
ClientApplication clientApplication = context.get(clientApplicationKey);
- JerseyResourceDelegateContextKey<Integer> offsetKey =
- JerseyResourceDelegateContextKey.valueOf(offsetKeyName, Integer.class);
- int offset = context.get(offsetKey);
JerseyResourceDelegateContextKey<Integer> countKey =
JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
int count = context.get(countKey);
@@ -329,14 +325,17 @@ public class QueryResultResource {
clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue());
try {
- skipOffsetRow(cachedQueryResultScanner, offset);
-
+ int start_offset = cachedQueryResultScanner.getCurrentRowNumber();
List<ByteString> output = cachedQueryResultScanner.getNextRows(count);
String digestString = getEncodedBase64DigestString(output);
+ boolean eos = count != output.size();
return Response.ok(new QueryResultStreamingOutput(output))
- .header(tajoDigestHeaderName, digestString)
- .build();
+ .header(tajoDigestHeaderName, digestString)
+ .header(tajoOffsetHeaderName, start_offset)
+ .header(tajoCountHeaderName, output.size())
+ .header(tajoEOSHeaderName, eos)
+ .build();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
@@ -348,20 +347,6 @@ public class QueryResultResource {
}
}
- private void skipOffsetRow(NonForwardQueryResultScanner queryResultScanner, int offset) throws IOException {
- if (offset <= 0) {
- return;
- }
-
- int currentRow = queryResultScanner.getCurrentRowNumber();
-
- if (offset < (currentRow+1)) {
- throw new RuntimeException("Offset must be over the current row number");
- }
-
- queryResultScanner.getNextRows(offset - currentRow - 1);
- }
-
private String getEncodedBase64DigestString(List<ByteString> outputList) throws NoSuchAlgorithmException {
MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
http://git-wip-us.apache.org/repos/asf/tajo/blob/773c3eb9/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
index d4f1785..fec1626 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java
@@ -63,6 +63,9 @@ public class TestQueryResultResource extends QueryTestCaseBase {
private static final String tajoSessionIdHeaderName = "X-Tajo-Session";
private static final String tajoDigestHeaderName = "X-Tajo-Digest";
+ private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
+ private static final String tajoCountHeaderName = "X-Tajo-Count";
+ private static final String tajoEOSHeaderName = "X-Tajo-EOS";
public TestQueryResultResource() {
super(TajoConstants.DEFAULT_DATABASE_NAME);
@@ -220,69 +223,6 @@ public class TestQueryResultResource extends QueryTestCaseBase {
}
@Test
- public void testGetQueryResultSetWithOffset() throws Exception {
- String sessionId = generateNewSessionAndGetId();
- URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
- URI queryResultURI = new URI(queryIdURI + "/result");
-
- GetQueryResultDataResponse response = restClient.target(queryResultURI)
- .request().header(tajoSessionIdHeaderName, sessionId)
- .get(new GenericType<GetQueryResultDataResponse>(GetQueryResultDataResponse.class));
-
- assertNotNull(response);
- assertNotNull(response.getResultCode());
- assertEquals(ResultCode.OK, response.getResultCode());
- assertNotNull(response.getSchema());
- assertEquals(16, response.getSchema().getRootColumns().size());
- assertNotNull(response.getResultset());
- assertTrue(response.getResultset().getId() != 0);
- assertNotNull(response.getResultset().getLink());
-
- URI queryResultSetURI = response.getResultset().getLink();
-
- Response queryResultSetResponse = restClient.target(queryResultSetURI)
- .queryParam("count", 100)
- .queryParam("offset", 3)
- .request().header(tajoSessionIdHeaderName, sessionId)
- .get();
-
- assertNotNull(queryResultSetResponse);
- String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
- assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
-
- DataInputStream queryResultSetInputStream =
- new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
-
- assertNotNull(queryResultSetInputStream);
-
- boolean isFinished = false;
- List<Tuple> tupleList = TUtil.newList();
- RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
- MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
- while (!isFinished) {
- try {
- int length = queryResultSetInputStream.readInt();
- byte[] dataByteArray = new byte[length];
- int readBytes = queryResultSetInputStream.read(dataByteArray);
-
- assertEquals(length, readBytes);
-
- tupleList.add(decoder.toTuple(dataByteArray));
- messageDigest.update(dataByteArray);
- } catch (EOFException eof) {
- isFinished = true;
- }
- }
-
- assertEquals(3, tupleList.size());
- assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));
-
- for (Tuple aTuple: tupleList) {
- assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
- }
- }
-
- @Test
public void testGetQueryResultSetWithDefaultCount() throws Exception {
String sessionId = generateNewSessionAndGetId();
URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
@@ -309,7 +249,14 @@ public class TestQueryResultResource extends QueryTestCaseBase {
assertNotNull(queryResultSetResponse);
String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
+ int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName));
+ int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName));
+ boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName));
+
assertTrue(tajoDigest != null && !tajoDigest.isEmpty());
+ assertTrue(eos);
+ assertEquals(0, offset);
+ assertEquals(5, count);
DataInputStream queryResultSetInputStream =
new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));
[5/5] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Conflicts:
CHANGES
tajo-common/src/main/java/org/apache/tajo/SessionVars.java
tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/806469a2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/806469a2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/806469a2
Branch: refs/heads/index_support
Commit: 806469a26f7228b0ce7b6f0134dd72ee109accc3
Parents: 17dfe86 773c3eb
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Jun 29 15:29:01 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Jun 29 15:46:39 2015 +0900
----------------------------------------------------------------------
CHANGES | 14 +-
.../org/apache/tajo/catalog/CatalogUtil.java | 4 +-
.../java/org/apache/tajo/catalog/TableMeta.java | 92 +-------
.../main/java/org/apache/tajo/SessionVars.java | 5 +
.../java/org/apache/tajo/conf/TajoConf.java | 7 +-
.../tajo/engine/function/builtin/Corr.java | 224 +++++++++++++++++++
.../planner/physical/BSTIndexScanExec.java | 10 -
.../planner/physical/EmptyScanIterator.java | 40 ++++
.../engine/planner/physical/SeqScanExec.java | 111 +++++----
.../tajo/engine/utils/TupleCacheScanner.java | 114 ----------
.../ws/rs/resources/QueryResultResource.java | 35 +--
tajo-core/src/main/proto/InternalTypes.proto | 9 +
.../engine/function/TestBuiltinFunctions.java | 33 +++
.../tajo/engine/query/TestOuterJoinQuery.java | 2 +-
.../tajo/engine/query/TestTablePartitions.java | 106 ++++-----
.../rs/resources/TestQueryResultResource.java | 73 +-----
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../tajo/plan/LogicalPlanPreprocessor.java | 5 +-
.../org/apache/tajo/plan/LogicalPlanner.java | 10 +-
.../org/apache/tajo/plan/TablePropertyUtil.java | 92 ++++++++
.../org/apache/tajo/plan/util/PlannerUtil.java | 28 ---
.../apache/tajo/storage/AbstractScanner.java | 27 ++-
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../org/apache/tajo/storage/NullScanner.java | 8 +-
.../java/org/apache/tajo/storage/Scanner.java | 9 +-
.../apache/tajo/storage/SeekableScanner.java | 4 +-
.../apache/tajo/storage/hbase/HBaseScanner.java | 6 +-
.../java/org/apache/tajo/storage/CSVFile.java | 4 +-
.../org/apache/tajo/storage/FileScanner.java | 6 -
.../org/apache/tajo/storage/FileTablespace.java | 7 +-
.../java/org/apache/tajo/storage/RawFile.java | 7 +
.../java/org/apache/tajo/storage/RowFile.java | 7 +
.../apache/tajo/storage/avro/AvroScanner.java | 7 +
.../tajo/storage/parquet/ParquetScanner.java | 7 +
.../org/apache/tajo/storage/rcfile/RCFile.java | 7 +
.../sequencefile/SequenceFileScanner.java | 7 +
.../tajo/storage/text/DelimitedTextFile.java | 7 +-
.../apache/tajo/storage/TestMergeScanner.java | 2 +-
.../org/apache/tajo/storage/TestStorages.java | 10 +-
39 files changed, 667 insertions(+), 485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/CHANGES
----------------------------------------------------------------------
diff --cc CHANGES
index ffed05c,90ef54a..746ed92
--- a/CHANGES
+++ b/CHANGES
@@@ -161,10 -166,9 +166,13 @@@ Release 0.11.0 - unrelease
BUG FIXES
+ TAJO-1608: Fix test failure in index_support branch. (jihoon)
+
+ TAJO-1594: Catalog schema is invalid for some databases. (jihoon)
+
+ TAJO-1644: When inserting empty data into a partitioned table,
+ existing data would be removed. (jaehwa)
+
TAJO-1642: CatalogServer need to check meta table first. (jaehwa)
TAJO-1650: TestQueryResource.testGetAllQueries() occasionally fails.
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --cc tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index ba6a0cd,28fdb0b..9fa14c2
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@@ -126,10 -126,10 +126,15 @@@ public enum SessionVars implements Conf
NULL_CHAR(ConfVars.$TEXT_NULL, "null char of text file output", DEFAULT),
CODEGEN(ConfVars.$CODEGEN, "Runtime code generation enabled (experiment)", DEFAULT),
+ // for index
+ INDEX_ENABLED(ConfVars.$INDEX_ENABLED, "index scan enabled", DEFAULT),
+ INDEX_SELECTIVITY_THRESHOLD(ConfVars.$INDEX_SELECTIVITY_THRESHOLD, "the selectivity threshold for index scan", DEFAULT),
+
++ // for partition overwrite
+ PARTITION_NO_RESULT_OVERWRITE_ENABLED(ConfVars.$PARTITION_NO_RESULT_OVERWRITE_ENABLED,
+ "If True, a partitioned table is overwritten even if a sub query leads to no result. "
+ + "Otherwise, the table data will be kept if there is no result", DEFAULT),
+
// Behavior Control ---------------------------------------------------------
ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
"If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT),
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 521b6b9,54abca8..19da87b
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@@ -96,98 -67,12 +96,88 @@@ public class BSTIndexScanExec extends P
this.reader.open();
}
+ private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
+ Schema mergedSchema = new Schema();
+ Set<Column> qualAndTargets = TUtil.newHashSet();
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ for (Target target : targets) {
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
+ }
+ for (Column column : originalSchema.getRootColumns()) {
+ if (subSchema.contains(column)
+ || qualAndTargets.contains(column)
+ || qualAndTargets.contains(column)) {
+ mergedSchema.addColumn(column);
+ }
+ }
+ return mergedSchema;
+ }
+
@Override
public void init() throws IOException {
+ Schema projected;
+
+ // in the case where projected column or expression are given
+ // the target can be an empty list.
+ if (plan.hasTargets()) {
+ projected = new Schema();
+ Set<Column> columnSet = new HashSet<Column>();
+
+ if (plan.hasQual()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ }
+
+ for (Target t : plan.getTargets()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+ }
+
+ for (Column column : inSchema.getAllColumns()) {
+ if (columnSet.contains(column)) {
+ projected.addColumn(column);
+ }
+ }
+
+ } else {
+ // no any projected columns, meaning that all columns should be projected.
+ // TODO - this implicit rule makes code readability bad. So, we should remove it later
+ projected = outSchema;
+ }
+
+ initScanner(projected);
super.init();
progress = 0.0f;
- if (qual != null) {
- qual.bind(context.getEvalContext(), inSchema);
+
+ if (plan.hasQual()) {
+ if (fileScanner.isProjectable()) {
+ qual.bind(context.getEvalContext(), projected);
+ } else {
+ qual.bind(context.getEvalContext(), inSchema);
+ }
+ }
+ }
+
+ private void initScanner(Schema projected) throws IOException {
+
- TableMeta meta;
- try {
- meta = (TableMeta) plan.getTableDesc().getMeta().clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
-
- // set system default properties
- PlannerUtil.applySystemDefaultToTableProperties(context.getQueryContext(), meta);
-
+ // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
+ if (fragment != null) {
+
+ Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual);
+
+ this.fileScanner = OldStorageManager.getStorageManager(context.getConf(),
+ plan.getTableDesc().getMeta().getStoreType())
+ .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema);
+ this.fileScanner.init();
+
+ // See Scanner.isProjectable() method Depending on the result of isProjectable(),
+ // the width of retrieved tuple is changed.
+ //
+ // If TRUE, the retrieved tuple will contain only projected fields.
+ // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
+ if (fileScanner.isProjectable()) {
+ this.projector = new Projector(context, projected, outSchema, plan.getTargets());
+ } else {
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --cc tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 46c879c,5c2ffe3..1f878f1
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@@ -35,8 -35,7 +35,9 @@@ Available Session Variables
\set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite.
\set NULL_CHAR [text value] - null char of text file output
\set CODEGEN [true or false] - Runtime code generation enabled (experiment)
+\set INDEX_ENABLED [true or false] - index scan enabled
+\set INDEX_SELECTIVITY_THRESHOLD [real value] - the selectivity threshold for index scan
+ \set PARTITION_NO_RESULT_OVERWRITE_ENABLED [true or false] - If True, a partitioned table is overwritten even if a sub query leads to no result. Otherwise, the table data will be kept if there is no result
\set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/806469a2/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
[2/5] tajo git commit: TAJO-1644: When inserting empty data into a
partitioned table, existing data would be removed. (jaehwa)
Posted by ji...@apache.org.
TAJO-1644: When inserting empty data into a partitioned table, existing data would be removed. (jaehwa)
Closes #601
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f57d6c43
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f57d6c43
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f57d6c43
Branch: refs/heads/index_support
Commit: f57d6c43fd201326fef2a695b1d1e798d0f814e3
Parents: aa49dc4
Author: JaeHwa Jung <bl...@apache.org>
Authored: Fri Jun 26 11:11:09 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Fri Jun 26 11:11:09 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../main/java/org/apache/tajo/SessionVars.java | 4 +
.../java/org/apache/tajo/conf/TajoConf.java | 7 +-
.../tajo/engine/query/TestOuterJoinQuery.java | 2 +-
.../tajo/engine/query/TestTablePartitions.java | 106 ++++++++-----------
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../org/apache/tajo/storage/FileTablespace.java | 7 +-
7 files changed, 66 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9412179..e0cbd47 100644
--- a/CHANGES
+++ b/CHANGES
@@ -163,6 +163,9 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1644: When inserting empty data into a partitioned table,
+ existing data would be removed. (jaehwa)
+
TAJO-1642: CatalogServer need to check meta table first. (jaehwa)
TAJO-1650: TestQueryResource.testGetAllQueries() occasionally fails.
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 98c2f3e..28fdb0b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -126,6 +126,10 @@ public enum SessionVars implements ConfigKey {
NULL_CHAR(ConfVars.$TEXT_NULL, "null char of text file output", DEFAULT),
CODEGEN(ConfVars.$CODEGEN, "Runtime code generation enabled (experiment)", DEFAULT),
+ PARTITION_NO_RESULT_OVERWRITE_ENABLED(ConfVars.$PARTITION_NO_RESULT_OVERWRITE_ENABLED,
+ "If True, a partitioned table is overwritten even if a sub query leads to no result. "
+ + "Otherwise, the table data will be kept if there is no result", DEFAULT),
+
// Behavior Control ---------------------------------------------------------
ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
"If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT),
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ba777c1..14cfb11 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -323,7 +323,6 @@ public class TajoConf extends Configuration {
$DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
$DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
$DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
-
$DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")),
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256, Validators.min("1")),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256, Validators.min("1")),
@@ -376,7 +375,11 @@ public class TajoConf extends Configuration {
// Behavior Control ---------------------------------------------------------
$BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
- // ResultSet ---------------------------------------------------------
+ // If True, a partitioned table is overwritten even if a sub query leads to no result.
+ // Otherwise, the table data will be kept if there is no result
+ $PARTITION_NO_RESULT_OVERWRITE_ENABLED("tajo.partition.overwrite.even-if-no-result", false),
+
+ // ResultSet ---------------------------------------------------------
$RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
$RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),
;
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
index 9445557..9d0e0bc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
@@ -251,7 +251,7 @@ public class TestOuterJoinQuery extends TestJoinQuery {
}
@Test
- @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true)
@SimpleTest(queries = {
@QuerySpec("select t1.id, t1.name, t2.id, t3.id, t4.id\n" +
"from jointable11 t1\n" +
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 397b9ef..ef57356 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -56,6 +56,7 @@ import java.util.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class TestTablePartitions extends QueryTestCaseBase {
@@ -437,18 +438,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
Path path = new Path(desc.getUri());
FileSystem fs = FileSystem.get(conf);
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+ verifyDirectoriesForThreeColumns(fs, path, 1);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
@@ -488,22 +478,11 @@ public class TestTablePartitions extends QueryTestCaseBase {
desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
path = new Path(desc.getUri());
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
+ verifyDirectoriesForThreeColumns(fs, path, 2);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
+
String expected = "N\n" +
"N\n" +
"N\n" +
@@ -548,54 +527,61 @@ public class TestTablePartitions extends QueryTestCaseBase {
+ " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1");
res.close();
- desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
+ verifyDirectoriesForThreeColumns(fs, path, 3);
if (!testingCluster.isHiveCatalogStoreRunning()) {
// TODO: If there is existing another partition directory, we must add its rows number to result row numbers.
+ // desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
// assertEquals(6, desc.getStats().getNumRows().intValue());
}
- res = executeString("select * from " + tableName + " where col2 = 1");
- resultSetData = resultSetToString(res);
- res.close();
- expected = "col4,col1,col2,col3\n" +
- "-------------------------------\n" +
- "N,1,1,17.0\n" +
- "N,1,1,17.0\n" +
- "N,1,1,30.0\n" +
- "N,1,1,36.0\n" +
- "N,1,1,36.0\n";
-
- assertEquals(expected, resultSetData);
+ verifyKeptExistingData(res, tableName);
// insert overwrite empty result to partitioned table
res = executeString("insert overwrite into " + tableName
- + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey" +
- " > 100");
+ + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey > 100");
res.close();
- desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+ verifyDirectoriesForThreeColumns(fs, path, 4);
+ verifyKeptExistingData(res, tableName);
- ContentSummary summary = fs.getContentSummary(new Path(desc.getUri()));
+ executeString("DROP TABLE " + tableName + " PURGE").close();
+ }
- assertEquals(summary.getDirectoryCount(), 1L);
- assertEquals(summary.getFileCount(), 0L);
- assertEquals(summary.getLength(), 0L);
+ private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception {
+ res = executeString("select * from " + tableName + " where col2 = 1");
+ String resultSetData = resultSetToString(res);
+ res.close();
+ String expected = "col4,col1,col2,col3\n" +
+ "-------------------------------\n" +
+ "N,1,1,17.0\n" +
+ "N,1,1,17.0\n" +
+ "N,1,1,30.0\n" +
+ "N,1,1,36.0\n" +
+ "N,1,1,36.0\n";
- executeString("DROP TABLE " + tableName + " PURGE").close();
+ assertEquals(expected, resultSetData);
+ }
+
+ private final void verifyDirectoriesForThreeColumns(FileSystem fs, Path path, int step) throws Exception {
+ assertTrue(fs.isDirectory(path));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+
+ if (step == 1 || step == 2) {
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+ } else {
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
+ }
+
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
}
@Test
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 137b0de..5c2ffe3 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -35,6 +35,7 @@ Available Session Variables:
\set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite.
\set NULL_CHAR [text value] - null char of text file output
\set CODEGEN [true or false] - Runtime code generation enabled (experiment)
+\set PARTITION_NO_RESULT_OVERWRITE_ENABLED [true or false] - If True, a partitioned table is overwritten even if a sub query leads to no result. Otherwise, the table data will be kept if there is no result
\set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
http://git-wip-us.apache.org/repos/asf/tajo/blob/f57d6c43/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 3b63012..e8a6c12 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -981,7 +981,12 @@ public class FileTablespace extends Tablespace {
Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
ContentSummary summary = fs.getContentSummary(stagingResultDir);
- if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) {
+ // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not.
+ boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED);
+
+ // If existing data doesn't need to keep, check if there are some files.
+ if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty())
+ && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) {
// This is a map for existing non-leaf directory to rename. A key is current directory and a value is
// renaming directory.
Map<Path, Path> renameDirs = TUtil.newHashMap();
[3/5] tajo git commit: TAJO-1658: Filter push down to underlying
storages.
Posted by ji...@apache.org.
TAJO-1658: Filter push down to underlying storages.
Closes #613
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8f3215d1
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8f3215d1
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8f3215d1
Branch: refs/heads/index_support
Commit: 8f3215d12346d71808a43bf4e2a22e1edd8874ff
Parents: f57d6c4
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Jun 28 02:12:11 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Jun 28 02:12:11 2015 -0700
----------------------------------------------------------------------
CHANGES | 6 +-
.../org/apache/tajo/catalog/CatalogUtil.java | 4 +-
.../java/org/apache/tajo/catalog/TableMeta.java | 92 +--------------
.../planner/physical/EmptyScanIterator.java | 40 +++++++
.../engine/planner/physical/SeqScanExec.java | 111 ++++++++++--------
.../tajo/engine/utils/TupleCacheScanner.java | 114 -------------------
.../tajo/plan/LogicalPlanPreprocessor.java | 5 +-
.../org/apache/tajo/plan/LogicalPlanner.java | 10 +-
.../org/apache/tajo/plan/TablePropertyUtil.java | 92 +++++++++++++++
.../org/apache/tajo/plan/util/PlannerUtil.java | 28 -----
.../apache/tajo/storage/AbstractScanner.java | 27 +++--
.../org/apache/tajo/storage/MergeScanner.java | 5 +-
.../org/apache/tajo/storage/NullScanner.java | 8 +-
.../java/org/apache/tajo/storage/Scanner.java | 9 +-
.../apache/tajo/storage/SeekableScanner.java | 4 +-
.../apache/tajo/storage/hbase/HBaseScanner.java | 6 +-
.../java/org/apache/tajo/storage/CSVFile.java | 4 +-
.../org/apache/tajo/storage/FileScanner.java | 6 -
.../java/org/apache/tajo/storage/RawFile.java | 7 ++
.../java/org/apache/tajo/storage/RowFile.java | 7 ++
.../apache/tajo/storage/avro/AvroScanner.java | 7 ++
.../tajo/storage/parquet/ParquetScanner.java | 7 ++
.../org/apache/tajo/storage/rcfile/RCFile.java | 7 ++
.../sequencefile/SequenceFileScanner.java | 7 ++
.../tajo/storage/text/DelimitedTextFile.java | 7 +-
.../apache/tajo/storage/TestMergeScanner.java | 2 +-
.../org/apache/tajo/storage/TestStorages.java | 10 +-
27 files changed, 309 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e0cbd47..04c4392 100644
--- a/CHANGES
+++ b/CHANGES
@@ -64,8 +64,8 @@ Release 0.11.0 - unreleased
TAJO-1603: Refactor StorageManager. (hyunsik)
- TAJO-1542: Refactoring of HashJoinExecs. (Contributed by Navis, Committed by
- hyunsik)
+ TAJO-1542: Refactoring of HashJoinExecs. (Contributed by Navis,
+ Committed by hyunsik)
TAJO-1591: Change StoreType represented as Enum to String type. (hyunsik)
@@ -357,6 +357,8 @@ Release 0.11.0 - unreleased
SUB TASKS
+ TAJO-1658: Filter push down to underlying storages. (hyunsik)
+
TAJO-1616: Implement TablespaceManager to load Tablespaces. (hyunsik)
TAJO-1615: Implement TaskManager. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6c6915b..638ebca 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -313,7 +313,7 @@ public class CatalogUtil {
}
public static TableMeta newTableMeta(String storeType) {
- KeyValueSet defaultProperties = CatalogUtil.newPhysicalProperties(storeType);
+ KeyValueSet defaultProperties = CatalogUtil.newDefaultProperty(storeType);
return new TableMeta(storeType, defaultProperties);
}
@@ -871,7 +871,7 @@ public class CatalogUtil {
* @param storeType StoreType
* @return Table properties
*/
- public static KeyValueSet newPhysicalProperties(String storeType) {
+ public static KeyValueSet newDefaultProperty(String storeType) {
KeyValueSet options = new KeyValueSet();
if (storeType.equalsIgnoreCase("CSV") || storeType.equalsIgnoreCase("TEXT")) {
options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
index 2b31b83..6838fe0 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
@@ -24,9 +24,7 @@ import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableProtoOrBuilder;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.util.KeyValueSet;
@@ -37,95 +35,44 @@ import java.util.Map;
* It contains all information for scanning a fragmented table
*/
public class TableMeta implements ProtoObject<CatalogProtos.TableProto>, GsonObject, Cloneable {
- protected TableProto.Builder builder = null;
- private TableProto proto = TableProto.getDefaultInstance();
- private boolean viaProto = false;
-
@Expose protected String storeType;
@Expose protected KeyValueSet options;
- private TableMeta() {
- builder = TableProto.newBuilder();
- }
-
public TableMeta(String storeType, KeyValueSet options) {
- this();
this.storeType = storeType;
this.options = new KeyValueSet(options);
}
public TableMeta(TableProto proto) {
- this.proto = proto;
- viaProto = true;
+ this.storeType = proto.getStoreType();
+ this.options = new KeyValueSet(proto.getParams());
}
public String getStoreType() {
- TableProtoOrBuilder p = viaProto ? proto : builder;
- if (this.storeType != null) {
- return storeType;
- }
- if (!p.hasStoreType()) {
- return null;
- }
- this.storeType = p.getStoreType();
return this.storeType;
}
public void setOptions(KeyValueSet options) {
- maybeInitBuilder();
this.options = options;
}
public void putOption(String key, String val) {
- maybeInitBuilder();
options.set(key, val);
}
public boolean containsOption(String key) {
- TableProtoOrBuilder p = viaProto ? proto : builder;
- if (options != null) {
- return this.options.containsKey(key);
- }
- if (!p.hasParams()) {
- return false;
- }
- this.options = new KeyValueSet(p.getParams());
return options.containsKey(key);
}
public String getOption(String key) {
- TableProtoOrBuilder p = viaProto ? proto : builder;
- if (options != null) {
- return this.options.get(key);
- }
- if (!p.hasParams()) {
- return null;
- }
- this.options = new KeyValueSet(p.getParams());
return options.get(key);
}
public String getOption(String key, String defaultValue) {
- TableProtoOrBuilder p = viaProto ? proto : builder;
- if (options != null) {
- return this.options.get(key, defaultValue);
- }
- if (!p.hasParams()) {
- return null;
- }
- this.options = new KeyValueSet(p.getParams());
return options.get(key, defaultValue);
}
public KeyValueSet getOptions() {
- TableProtoOrBuilder p = viaProto ? proto : builder;
- if (options != null) {
- return this.options;
- }
- if (!p.hasParams()) {
- return null;
- }
- this.options = new KeyValueSet(p.getParams());
return options;
}
@@ -152,10 +99,8 @@ public class TableMeta implements ProtoObject<CatalogProtos.TableProto>, GsonObj
@Override
public Object clone() throws CloneNotSupportedException {
TableMeta meta = (TableMeta) super.clone();
- meta.builder = TableProto.newBuilder();
meta.storeType = getStoreType();
meta.options = (KeyValueSet) (toMap() != null ? options.clone() : null);
-
return meta;
}
@@ -169,10 +114,10 @@ public class TableMeta implements ProtoObject<CatalogProtos.TableProto>, GsonObj
// ProtoObject
////////////////////////////////////////////////////////////////////////
public TableProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
+ TableProto.Builder builder = TableProto.newBuilder();
+ builder.setStoreType(storeType);
+ builder.setParams(options.getProto());
+ return builder.build();
}
@Override
@@ -185,29 +130,4 @@ public class TableMeta implements ProtoObject<CatalogProtos.TableProto>, GsonObj
getStoreType();
toMap();
}
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = TableProto.newBuilder(proto);
- }
- viaProto = true;
- }
-
- private void mergeLocalToBuilder() {
- if (storeType != null) {
- builder.setStoreType(storeType);
- }
- if (this.options != null) {
- builder.setParams(options.getProto());
- }
- }
-
- private void mergeLocalToProto() {
- if(viaProto) {
- maybeInitBuilder();
- }
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EmptyScanIterator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EmptyScanIterator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EmptyScanIterator.java
new file mode 100644
index 0000000..cfbd3ed
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EmptyScanIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * Empty Iterator
+ */
+public class EmptyScanIterator implements ScanIterator {
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ throw new IOException(this.getClass().getSimpleName() + "::next() is invoked.");
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 79e0a5d..b4f7a38 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -19,10 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -37,7 +34,6 @@ import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.expr.FieldEval;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
@@ -137,8 +133,7 @@ public class SeqScanExec extends ScanExec {
}
}
- @Override
- public void init() throws IOException {
+ public Schema getProjectSchema() {
Schema projected;
// in the case where projected column or expression are given
@@ -167,23 +162,55 @@ public class SeqScanExec extends ScanExec {
projected = outSchema;
}
- initScanner(projected);
- super.init();
-
- if (plan.hasQual()) {
- if (scanner.isProjectable()) {
- qual.bind(context.getEvalContext(), projected);
- } else {
- qual.bind(context.getEvalContext(), inSchema);
- }
+ return projected;
+ }
+ private void initScanIterator() {
+ // We should use FilterScanIterator only if underlying storage does not support filter push down.
+ if (plan.hasQual() && !scanner.isSelectable()) {
scanIt = new FilterScanIterator(scanner, qual);
+
} else {
+ if (scanner.isSelectable()) { // TODO - isSelectable should be moved to FormatProperty
+ scanner.setFilter(qual);
+ }
scanIt = new FullScanIterator(scanner);
}
}
@Override
+ public void init() throws IOException {
+
+ // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
+
+ if (fragments == null) {
+ scanIt = new EmptyScanIterator();
+
+ } else {
+ Schema projectedFields = getProjectSchema();
+ initScanner(projectedFields);
+
+ // See Scanner.isProjectable() method. Depending on the result of isProjectable(),
+ // the width of retrieved tuple is changed.
+ //
+ // If projectable, the retrieved tuple will contain only projected fields.
+ // Otherwise, the retrieved tuple will contain projected fields and NullDatum
+ // for non-projected fields.
+ Schema actualInSchema = scanner.isProjectable() ? projectedFields : inSchema;
+
+ this.projector = new Projector(context, actualInSchema, outSchema, plan.getTargets());
+
+ if (plan.hasQual()) {
+ qual.bind(context.getEvalContext(), actualInSchema);
+ }
+
+ initScanIterator();
+ }
+
+ super.init();
+ }
+
+ @Override
protected void compile() throws CompilationError {
if (plan.hasQual()) {
qual = context.getPrecompiledEval(inSchema, qual);
@@ -191,47 +218,33 @@ public class SeqScanExec extends ScanExec {
}
private void initScanner(Schema projected) throws IOException {
-
- TableMeta meta;
- try {
- meta = (TableMeta) plan.getTableDesc().getMeta().clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
+ TableDesc table = plan.getTableDesc();
+ TableMeta meta = table.getMeta();
- // set system default properties
- PlannerUtil.applySystemDefaultToTableProperties(context.getQueryContext(), meta);
+ if (fragments.length > 1) {
- // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
- if (fragments != null) {
- if (fragments.length > 1) {
- this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), meta,
- FragmentConvertor.convert(context.getConf(), fragments), projected
- );
- } else {
- Tablespace tablespace = TablespaceManager.get(plan.getTableDesc().getUri()).get();
- this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected);
- }
- scanner.init();
+ this.scanner = new MergeScanner(
+ context.getConf(),
+ plan.getPhysicalSchema(), meta,
+ FragmentConvertor.convert(context.getConf(), fragments),
+ projected
+ );
- // See Scanner.isProjectable() method Depending on the result of isProjectable(),
- // the width of retrieved tuple is changed.
- //
- // If TRUE, the retrieved tuple will contain only projected fields.
- // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
- if (scanner.isProjectable()) {
- this.projector = new Projector(context, projected, outSchema, plan.getTargets());
- } else {
- this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
- }
+ } else {
+
+ Tablespace tablespace = TablespaceManager.get(table.getUri()).get();
+ this.scanner = tablespace.getScanner(
+ meta,
+ plan.getPhysicalSchema(),
+ fragments[0],
+ projected);
}
+
+ scanner.init();
}
@Override
public Tuple next() throws IOException {
- if (fragments == null) {
- return null;
- }
while(scanIt.hasNext()) {
Tuple outTuple = new VTuple(outColumnNum);
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
deleted file mode 100644
index 0fd2fbe..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheScanner.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-public class TupleCacheScanner implements Scanner {
- List<Tuple> cacheData;
- Schema schema;
- Iterator<Tuple> it;
- int count;
- TableStats inputStats = new TableStats();
-
- public TupleCacheScanner(List<Tuple> cacheData, Schema schema) {
- this.cacheData = cacheData;
- this.schema = schema;
- }
- @Override
- public void init() throws IOException {
- inputStats.setNumRows(cacheData.size());
- inputStats.setReadBytes(0);
- it = cacheData.iterator();
- count = 0;
- }
-
- @Override
- public Tuple next() throws IOException {
- if (it.hasNext()) {
- count++;
- Tuple tuple = it.next();
- try {
- return (Tuple)tuple.clone();
- } catch (CloneNotSupportedException e) {
- throw new IOException(e.getMessage(), e);
- }
- } else {
- return null;
- }
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- }
-
- @Override
- public boolean isSelectable() {
- return true;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public boolean isSplittable() {
- return false;
- }
-
- @Override
- public float getProgress() {
- if (cacheData.size() == 0) {
- return 1.0f;
- }
- return ((float)count) / cacheData.size();
- }
-
- @Override
- public TableStats getInputStats() {
- return inputStats;
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
index fbad76e..dced4d3 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
@@ -383,14 +383,17 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanner.P
}
TableDesc desc = catalog.getTableDesc(actualRelationName);
+
ScanNode scanNode = ctx.plan.createNode(ScanNode.class);
if (relation.hasAlias()) {
scanNode.init(desc, relation.getAlias());
} else {
scanNode.init(desc);
}
- ctx.queryBlock.addRelation(scanNode);
+ TablePropertyUtil.setTableProperty(ctx.getQueryContext(), scanNode);
+
+ ctx.queryBlock.addRelation(scanNode);
return scanNode;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index a2480c9..c51d068 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1792,20 +1792,20 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
// Set default storage properties to table
- KeyValueSet properties = CatalogUtil.newPhysicalProperties(createTableNode.getStorageType());
+ createTableNode.setOptions(CatalogUtil.newDefaultProperty(createTableNode.getStorageType()));
// Priority to apply table properties
// 1. Explicit table properties specified in WITH clause
// 2. Session variables
// Set session variables to properties
- PlannerUtil.applySessionToTableProperties(context.queryContext, createTableNode.getStorageType(), properties);
- // Set table properties specified in WITH clause
+ TablePropertyUtil.setTableProperty(context.queryContext, createTableNode);
+
+ // Set table property specified in WITH clause and it will override all others
if (expr.hasParams()) {
- properties.putAll(expr.getParams());
+ createTableNode.getOptions().putAll(expr.getParams());
}
- createTableNode.setOptions(properties);
if (expr.hasPartition()) {
if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
new file mode 100644
index 0000000..5576889
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
+
+/**
+ * An utility class for table property
+ */
+public class TablePropertyUtil {
+ /**
+ * It sets default table property for newly created table
+ *
+ * @param context QueryContext
+ * @param node CreateTableNode
+ */
+ public static void setTableProperty(OverridableConf context, CreateTableNode node) {
+ String storeType = node.getStorageType();
+ KeyValueSet property = node.getOptions();
+
+ if (storeType.equalsIgnoreCase("CSV") || storeType.equalsIgnoreCase("TEXT")) {
+ setSessionToProperty(context, SessionVars.NULL_CHAR, property, StorageConstants.TEXT_NULL);
+ }
+
+ setSessionToProperty(context, SessionVars.TIMEZONE, property, StorageConstants.TIMEZONE);
+ }
+
+ private static void setSessionToProperty(OverridableConf context,
+ SessionVars sessionVarKey,
+ KeyValueSet property,
+ String propertyKey) {
+
+ if (context.containsKey(sessionVarKey)) {
+ property.set(propertyKey, context.get(sessionVarKey));
+ }
+ }
+
+ /**
+ * It sets default table properties affected by system global configuration
+ * The table property are implicitly used to read Table rows.
+ *
+ * @param context QueryContext
+ * @param node ScanNode
+ */
+ public static void setTableProperty(OverridableConf context, ScanNode node) {
+ TableMeta meta = node.getTableDesc().getMeta();
+
+ setProperty(context, SessionVars.TIMEZONE, meta, StorageConstants.TIMEZONE);
+ setProperty(context, SessionVars.NULL_CHAR, meta, StorageConstants.TEXT_NULL);
+ }
+
+ /**
+ * If there is no table property for the propertyKey, set default property to the table.
+ * If session variable is set, it is set to the table property. Otherwise, the default property
+ * in the system conf will be used.
+ *
+ * @param context QueryContext
+ * @param sessionVarKey session variable key
+ * @param meta TableMeta
+ * @param propertyKey table property key
+ */
+ private static void setProperty(OverridableConf context, SessionVars sessionVarKey,
+ TableMeta meta, String propertyKey) {
+
+ if (!meta.containsOption(propertyKey)) {
+ meta.putOption(propertyKey, context.get(sessionVarKey));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 441e047..19e6ad1 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -850,34 +850,6 @@ public class PlannerUtil {
return explains.toString();
}
- public static void applySessionToTableProperties(OverridableConf sessionVars,
- String storeType,
- KeyValueSet tableProperties) {
- if (storeType.equalsIgnoreCase("CSV") || storeType.equalsIgnoreCase("TEXT")) {
- if (sessionVars.containsKey(SessionVars.NULL_CHAR)) {
- tableProperties.set(StorageConstants.TEXT_NULL, sessionVars.get(SessionVars.NULL_CHAR));
- }
-
- if (sessionVars.containsKey(SessionVars.TIMEZONE)) {
- tableProperties.set(StorageConstants.TIMEZONE, sessionVars.get(SessionVars.TIMEZONE));
- }
- }
- }
-
- /**
- * This method sets a set of table properties by System default configs.
- * These properties are implicitly used to read or write rows in Table.
- * Don't use this method for TableMeta to be stored in Catalog.
- *
- * @param systemConf System configuration
- * @param meta TableMeta to be set
- */
- public static void applySystemDefaultToTableProperties(OverridableConf systemConf, TableMeta meta) {
- if (!meta.containsOption(StorageConstants.TIMEZONE)) {
- meta.putOption(StorageConstants.TIMEZONE, systemConf.get(SessionVars.TIMEZONE));
- }
- }
-
public static boolean isFileStorageType(String storageType) {
if (storageType.equalsIgnoreCase("hbase")) {
return false;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
index 3719412..07fe353 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
@@ -21,60 +21,69 @@ package org.apache.tajo.storage;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import java.io.IOException;
-// dummy scanner
+/**
+ * It's a dummy class to avoid subclass to implement all methods.
+ */
public abstract class AbstractScanner implements Scanner {
@Override
public void init() throws IOException {
-
+ throw new UnimplementedException();
}
@Override
public void reset() throws IOException {
+ throw new UnimplementedException();
}
@Override
public void close() throws IOException {
+ throw new UnimplementedException();
}
@Override
public boolean isProjectable() {
- return false;
+ throw new UnimplementedException();
}
@Override
public void setTarget(Column[] targets) {
+ throw new UnsupportedException();
}
@Override
public boolean isSelectable() {
- return false;
+ throw new UnimplementedException();
}
@Override
- public void setSearchCondition(Object expr) {
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
public boolean isSplittable() {
- return false;
+ throw new UnimplementedException();
}
@Override
public float getProgress() {
- return 0;
+ throw new UnimplementedException();
}
@Override
public TableStats getInputStats() {
- return null;
+ throw new UnimplementedException();
}
@Override
public Schema getSchema() {
- return null;
+ throw new UnimplementedException();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 67a2f86..87f0310 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -25,6 +25,8 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
@@ -165,7 +167,8 @@ public class MergeScanner implements Scanner {
}
@Override
- public void setSearchCondition(Object expr) {
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
index 4272228..83d8e24 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -21,6 +21,8 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
@@ -79,12 +81,12 @@ public class NullScanner implements Scanner {
@Override
public boolean isSelectable() {
- return true;
+ return false;
}
@Override
- public void setSearchCondition(Object expr) {
-
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
index 7af8247..2fcb2fd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SchemaObject;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.plan.expr.EvalNode;
import java.io.Closeable;
import java.io.IOException;
@@ -28,8 +29,8 @@ import java.io.IOException;
/**
* Scanner Interface
*/
-public interface Scanner extends SchemaObject, Closeable {
+public interface Scanner extends SchemaObject, Closeable {
void init() throws IOException;
/**
@@ -88,12 +89,12 @@ public interface Scanner extends SchemaObject, Closeable {
boolean isSelectable();
/**
- * Set a search condition
- * @param expr to be searched
+ * Set a filter condition
+ * @param filter to be searched
*
* TODO - to be changed Object type
*/
- void setSearchCondition(Object expr);
+ void setFilter(EvalNode filter);
/**
* It returns if the file is splittable.
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
index 894e7ee..41d0872 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
@@ -22,7 +22,7 @@ import java.io.IOException;
public interface SeekableScanner extends Scanner {
- public abstract long getNextOffset() throws IOException;
+ long getNextOffset() throws IOException;
- public abstract void seek(long offset) throws IOException;
+ void seek(long offset) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 7369897..11851ec 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,6 +36,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
@@ -424,8 +426,8 @@ public class HBaseScanner implements Scanner {
}
@Override
- public void setSearchCondition(Object expr) {
- // TODO implements adding column filter to scanner.
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index ee3095c..8b8ca76 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -35,6 +35,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
@@ -536,7 +537,8 @@ public class CSVFile {
}
@Override
- public void setSearchCondition(Object expr) {
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
index 0726125..8844fa5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -89,12 +89,6 @@ public abstract class FileScanner implements Scanner {
this.targets = targets;
}
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
FileSystem fs;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 4e9bcda..3b655be 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -33,6 +33,8 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
@@ -426,6 +428,11 @@ public class RawFile {
}
@Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
+ @Override
public boolean isSplittable(){
return false;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 0e628d4..2be2ec0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -34,6 +34,8 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BitArray;
@@ -299,6 +301,11 @@ public class RowFile {
}
@Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
+ @Override
public boolean isSplittable(){
return true;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
index 26083a5..729c237 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -35,6 +35,8 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
@@ -273,6 +275,11 @@ public class AvroScanner extends FileScanner {
return false;
}
+ @Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
/**
* Returns whether this scanner is splittable.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index 2f8efcf..822151a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -21,6 +21,8 @@ package org.apache.tajo.storage.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
@@ -107,6 +109,11 @@ public class ParquetScanner extends FileScanner {
return false;
}
+ @Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
/**
* Returns whether this scanner is splittable.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 1dcec5f..286ee3a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -37,6 +37,8 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
@@ -1787,6 +1789,11 @@ public class RCFile {
}
@Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
+ @Override
public boolean isSplittable() {
return true;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index ff73a1c..340e2fa 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -31,6 +31,8 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
@@ -332,6 +334,11 @@ public class SequenceFileScanner extends FileScanner {
}
@Override
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
+ }
+
+ @Override
public boolean isSplittable(){
return true;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 55a2b96..2aa6707 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -34,6 +34,8 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
@@ -89,7 +91,7 @@ public class DelimitedTextFile {
serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName);
serdeClassCache.put(serDeClassName, serdeClass);
}
- lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
+ lineSerder = ReflectionUtil.newInstance(serdeClass);
} catch (Throwable e) {
throw new RuntimeException("TextLineSerde class cannot be initialized.", e);
}
@@ -459,7 +461,8 @@ public class DelimitedTextFile {
}
@Override
- public void setSearchCondition(Object expr) {
+ public void setFilter(EvalNode filter) {
+ throw new UnsupportedException();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 331d3e8..35a5ea4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -107,7 +107,7 @@ public class TestMergeScanner {
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
if (storeType.equalsIgnoreCase("AVRO")) {
meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
TEST_MULTIPLE_FILES_AVRO_SCHEMA);
http://git-wip-us.apache.org/repos/asf/tajo/blob/8f3215d1/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index dbfdac3..b53dbec 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -264,7 +264,7 @@ public class TestStorages {
schema.addColumn("score", Type.FLOAT4);
TableMeta meta = CatalogUtil.newTableMeta(storeType);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
if (storeType.equalsIgnoreCase("AVRO")) {
meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
TEST_PROJECTION_AVRO_SCHEMA);
@@ -341,7 +341,7 @@ public class TestStorages {
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
if (storeType.equalsIgnoreCase("AVRO")) {
String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
@@ -414,7 +414,7 @@ public class TestStorages {
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
@@ -969,7 +969,7 @@ public class TestStorages {
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
FileTablespace sm = TablespaceManager.getLocalFs();
@@ -1033,7 +1033,7 @@ public class TestStorages {
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
Path tablePath = new Path(testDir, "test_storetype_oversize.data");
FileTablespace sm = TablespaceManager.getLocalFs();