You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/04 05:37:19 UTC

[1/7] tajo git commit: TAJO-1190: INSERT INTO to partition tables may cause NPE.

Repository: tajo
Updated Branches:
  refs/heads/index_support 708453cd8 -> 5c11283a6


TAJO-1190: INSERT INTO to partition tables may cause NPE.

Closes #250


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1cdbe467
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1cdbe467
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1cdbe467

Branch: refs/heads/index_support
Commit: 1cdbe467e3dc25d7af59afc116ff9e8e6273a1ac
Parents: b4adc18
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 02:25:34 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 02:25:34 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  8 ++-
 .../planner/physical/PhysicalPlanUtil.java      | 65 +++++++++++++++-----
 .../engine/planner/physical/SeqScanExec.java    |  7 ++-
 .../tajo/worker/TajoWorkerClientService.java    | 16 ++---
 .../tajo/engine/query/TestTablePartitions.java  | 56 +++++++++++++++++
 ...rtitionedTableWithSmallerExpressions5.result |  7 +++
 ...rtitionedTableWithSmallerExpressions6.result |  4 ++
 .../apache/tajo/rpc/RemoteCallException.java    |  6 +-
 8 files changed, 144 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8d51d44..188e024 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,7 +15,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
-    TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa)
+    TAJO-1165: Needs to show error messages on query_executor.jsp. 
+    (Jihun Kang via jaehwa)
 
     TAJO-1204: Remove unused ServerName class. (DaeMyung Kang via jaehwa)
 
@@ -79,6 +80,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1190: INSERT INTO to partition tables may cause NPE. (hyunsik)
+
     TAJO-1211: Staging directory for CTAS and INSERT should be in 
     the output dir. (hyunsik)
 
@@ -87,7 +90,8 @@ Release 0.9.1 - unreleased
 
     TAJO-1119: JDBC driver should support TIMESTAMP type. (jaehwa)
 
-    TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
+    TAJO-1166: S3 related storage causes compilation error in Hadoop 
+    2.6.0-SNAPSHOT. (jaehwa)
 
     TAJO-1208: Failure of create table using textfile on hivemeta.
     (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index fe1f795..a63b838 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -77,31 +77,33 @@ public class PhysicalPlanUtil {
     Path path = new Path(tableDesc.getPath());
     FileSystem fs = path.getFileSystem(tajoConf);
 
+    //In the case of partitioned table, we should return same partition key data files.
+    int partitionDepth = 0;
+    if (tableDesc.hasPartition()) {
+      partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+    }
+
     List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
     if (fs.exists(path)) {
       getNonZeroLengthDataFiles(fs, path, nonZeroLengthFiles, fileIndex, numResultFiles,
-          new AtomicInteger(0));
+          new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
     }
 
     List<FileFragment> fragments = new ArrayList<FileFragment>();
 
-    //In the case of partitioned table, return same partition key data files.
-    int numPartitionColumns = 0;
-    if (tableDesc.hasPartition()) {
-      numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
-    }
+
     String[] previousPartitionPathNames = null;
     for (FileStatus eachFile: nonZeroLengthFiles) {
       FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
 
-      if (numPartitionColumns > 0) {
+      if (partitionDepth > 0) {
         // finding partition key;
         Path filePath = fileFragment.getPath();
         Path parentPath = filePath;
-        String[] parentPathNames = new String[numPartitionColumns];
-        for (int i = 0; i < numPartitionColumns; i++) {
+        String[] parentPathNames = new String[partitionDepth];
+        for (int i = 0; i < partitionDepth; i++) {
           parentPath = parentPath.getParent();
-          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
+          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
         }
 
         // If current partitionKey == previousPartitionKey, add to result.
@@ -120,20 +122,53 @@ public class PhysicalPlanUtil {
     return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{}));
   }
 
+  /**
+   *
+   * @param fs
+   * @param path The table path
+   * @param result The final result files to be used
+   * @param startFileIndex
+   * @param numResultFiles
+   * @param currentFileIndex
+   * @param partitioned A flag to indicate if this table is partitioned
+   * @param currentDepth Current visiting depth of partition directories
+   * @param maxDepth The partition depth of this table
+   * @throws IOException
+   */
   private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
                                          int startFileIndex, int numResultFiles,
-                                         AtomicInteger currentFileIndex) throws IOException {
+                                         AtomicInteger currentFileIndex, boolean partitioned,
+                                         int currentDepth, int maxDepth) throws IOException {
+    // Intermediate directory
     if (fs.isDirectory(path)) {
+
       FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter);
+
       if (files != null && files.length > 0) {
+
         for (FileStatus eachFile : files) {
+
+          // checking if the enough number of files are found
           if (result.size() >= numResultFiles) {
             return;
           }
+
           if (eachFile.isDirectory()) {
-            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
-                currentFileIndex);
-          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
+            getNonZeroLengthDataFiles(
+                fs,
+                eachFile.getPath(),
+                result,
+                startFileIndex,
+                numResultFiles,
+                currentFileIndex,
+                partitioned,
+                currentDepth + 1, // increment a visiting depth
+                maxDepth);
+
+
+            // if partitioned table, we should ignore files located in the intermediate directory.
+            // we can ensure that this file is in leaf directory if currentDepth == maxDepth.
+          } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) {
             if (currentFileIndex.get() >= startFileIndex) {
               result.add(eachFile);
             }
@@ -141,6 +176,8 @@ public class PhysicalPlanUtil {
           }
         }
       }
+
+      // Files located in leaf directory
     } else {
       FileStatus fileStatus = fs.getFileStatus(path);
       if (fileStatus != null && fileStatus.getLen() > 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 3cbb7c9..759b19c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -332,7 +332,12 @@ public class SeqScanExec extends PhysicalExec {
     if (scanner != null) {
       return scanner.getInputStats();
     } else {
-      return inputStats;
+      if (inputStats != null) {
+        return inputStats;
+      } else {
+        // If no fragment, there is no scanner. So, we need to create a dummy table stat.
+        return new TableStats();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index a41ffce..0f4a60c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
@@ -39,8 +40,6 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -132,6 +131,10 @@ public class TajoWorkerClientService extends AbstractService {
       return null;
     }
 
+    private boolean hasResultTableDesc(QueryContext queryContext) {
+      return !(queryContext.isCreateTable() || queryContext.isInsert());
+    }
+
     @Override
     public ClientProtos.GetQueryResultResponse getQueryResult(
             RpcController controller,
@@ -151,7 +154,9 @@ public class TajoWorkerClientService extends AbstractService {
       } else {
         switch (queryMasterTask.getState()) {
           case QUERY_SUCCEEDED:
-            builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+//            if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) {
+              builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+            //}
             break;
           case QUERY_FAILED:
           case QUERY_ERROR:
@@ -191,10 +196,7 @@ public class TajoWorkerClientService extends AbstractService {
           return builder.build();
         }
 
-        builder.setHasResult(
-            !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
-                queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
-        );
+        builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext()));
 
         queryMasterTask.touchSessionTime();
         Query query = queryMasterTask.getQuery();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 0e9ec7d..cff5bfb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.query;
 
 import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
@@ -820,6 +822,39 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
   }
 
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 text) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem");
+    res.close();
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 text) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString(
+        "insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem where l_orderkey = 1");
+    res.close();
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+  }
+
   private MasterPlan getQueryPlan(ResultSet res) {
     QueryId queryId = ((TajoResultSet)res).getQueryId();
     for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
@@ -936,4 +971,25 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testIgnoreFilesInIntermediateDir() throws Exception {
+    // See - TAJO-1219: Files located in intermediate directories of partitioned table should be ignored
+    // It verifies that Tajo ignores files located in intermediate directories of partitioned table.
+
+    Path testDir = CommonTestingUtil.getTestDir();
+
+    executeString(
+        "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " +
+        "LOCATION '" + testDir + "'");
+
+    FileSystem fs = testDir.getFileSystem(conf);
+    FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
+    fos.write("a|b|c".getBytes());
+    fos.close();
+
+    ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
+    assertFalse(res.next());
+    res.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
new file mode 100644
index 0000000..f972753
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
@@ -0,0 +1,7 @@
+col1,col2
+-------------------------------
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
+R,__TAJO_DEFAULT_PARTITION__
+R,__TAJO_DEFAULT_PARTITION__
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
new file mode 100644
index 0000000..6b8e2f1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
@@ -0,0 +1,4 @@
+col1,col2
+-------------------------------
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 90ee58a..52ef31a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -49,7 +49,11 @@ public class RemoteCallException extends RemoteException {
   public RpcResponse getResponse() {
     RpcResponse.Builder builder = RpcResponse.newBuilder();
     builder.setId(seqId);
-    builder.setErrorMessage(getCause().getMessage());
+    if (getCause().getMessage() == null) {
+      builder.setErrorMessage(getCause().getClass().getName());
+    } else {
+      builder.setErrorMessage(getCause().getMessage());
+    }
     builder.setErrorTrace(getStackTraceString(getCause()));
     builder.setErrorClass(originExceptionClass);
 


[5/7] tajo git commit: TAJO-1223: Wrong query verification against asterisk and more expressions in select list.

Posted by ji...@apache.org.
TAJO-1223: Wrong query verification against asterisk and more expressions in select list.

Closes #279


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/20d1f014
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/20d1f014
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/20d1f014

Branch: refs/heads/index_support
Commit: 20d1f0145b47e6c478bb55c79d670926e66f137c
Parents: ea2dbf8
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 17:34:28 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 18:34:33 2014 +0900

----------------------------------------------------------------------
 .../tajo/engine/query/TestInsertQuery.java      | 22 +++++++
 .../TestInsertQuery/lineitem_year_month_ddl.sql | 18 ++++++
 .../load_to_lineitem_year_month.sql             |  1 +
 .../testInsertOverwriteWithAsteriskAndMore.sql  |  1 +
 ...estInsertOverwriteWithAsteriskAndMore.result |  7 +++
 .../plan/verifier/PreLogicalPlanVerifier.java   | 64 ++++++++++++--------
 6 files changed, 87 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 117f186..cc7dced 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -392,6 +392,28 @@ public class TestInsertQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testInsertOverwriteWithAsteriskAndMore() throws Exception {
+    ResultSet res = executeFile("lineitem_year_month_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "lineitem_year_month"));
+
+    res = executeFile("load_to_lineitem_year_month.sql");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "lineitem_year_month");
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeQuery();
+    assertResultSet(res);
+    res.close();
+
+    executeString("DROP TABLE lineitem_year_month PURGE");
+  }
+
+  @Test
   public final void testInsertOverwriteIntoSelect() throws Exception {
     String tableName = CatalogUtil.normalizeIdentifier("insertoverwriteintoselect");
     ResultSet res = executeString("create table " + tableName + " as select l_orderkey from default.lineitem");

http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-core/src/test/resources/queries/TestInsertQuery/lineitem_year_month_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/lineitem_year_month_ddl.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/lineitem_year_month_ddl.sql
new file mode 100644
index 0000000..fb18ad8
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/lineitem_year_month_ddl.sql
@@ -0,0 +1,18 @@
+create table lineitem_year_month (
+ l_orderkey bigint,
+ l_partkey bigint,
+ l_suppkey bigint,
+ l_linenumber int,
+ l_quantity float8,
+ l_extendedprice float8,
+ l_discount float8,
+ l_tax float8,
+ l_returnflag text,
+ l_linestatus text,
+ l_shipdate text,
+ l_commitdate text,
+ l_receiptdate text,
+ l_shipinstruct text,
+ l_shipmode text,
+ l_comment text
+) PARTITION BY COLUMN (year TEXT, MONTH TEXT);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-core/src/test/resources/queries/TestInsertQuery/load_to_lineitem_year_month.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/load_to_lineitem_year_month.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/load_to_lineitem_year_month.sql
new file mode 100644
index 0000000..601af12
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/load_to_lineitem_year_month.sql
@@ -0,0 +1 @@
+INSERT INTO lineitem_year_month SELECT *, SUBSTR(l_shipdate, 1,4) as year, SUBSTR(l_shipdate, 6, 2) as month FROM default.lineitem;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.sql
new file mode 100644
index 0000000..07c904d
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.sql
@@ -0,0 +1 @@
+select * from lineitem_year_month;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-core/src/test/resources/results/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.result b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.result
new file mode 100644
index 0000000..bb797ba
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertOverwriteWithAsteriskAndMore.result
@@ -0,0 +1,7 @@
+l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,year,MONTH
+-------------------------------
+3,3,6540,2,49.0,46796.47,0.1,0.0,R,F,1993-11-09,1993-12-20,1993-11-24,TAKE BACK RETURN,RAIL, unusual accounts. eve,1993,11
+3,2,1798,1,45.0,54058.05,0.06,0.0,R,F,1994-02-02,1994-01-04,1994-02-23,NONE,AIR,ongside of the furiously brave acco,1994,02
+1,1,7706,1,17.0,21168.23,0.04,0.02,N,O,1996-03-13,1996-02-12,1996-03-22,DELIVER IN PERSON,TRUCK,egular courts above the,1996,03
+1,1,7311,2,36.0,45983.16,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,ly final dependencies: slyly bold ,1996,04
+2,2,1191,1,38.0,44694.46,0.0,0.05,N,O,1997-01-28,1997-01-14,1997-02-02,TAKE BACK RETURN,RAIL,ven requests. deposits breach a,1997,01
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/20d1f014/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
index 95e0f30..f6d04ba 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
@@ -254,36 +254,48 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVer
 
     if (child != null && child.getType() == OpType.Projection) {
       Projection projection = (Projection) child;
-      int projectColumnNum = projection.getNamedExprs().length;
 
-      if (expr.hasTargetColumns()) {
-        int targetColumnNum = expr.getTargetColumns().length;
+      // checking if at least one asterisk exists in target list
+      boolean includeAsterisk = false;
+      for (NamedExpr namedExpr : projection.getNamedExprs()) {
+        includeAsterisk |= namedExpr.getExpr().getType() == OpType.Asterisk;
+      }
 
-        if (targetColumnNum > projectColumnNum)  {
-          context.state.addVerification("INSERT has more target columns than expressions");
-        } else if (targetColumnNum < projectColumnNum) {
-          context.state.addVerification("INSERT has more expressions than target columns");
-        }
-      } else {
-        if (expr.hasTableName()) {
-          String qualifiedName = expr.getTableName();
-          if (TajoConstants.EMPTY_STRING.equals(CatalogUtil.extractQualifier(expr.getTableName()))) {
-            qualifiedName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE),
-                expr.getTableName());
-          }
+      // If one asterisk expression exists, we verify the match between the target exprs and output exprs.
+      // This verification will be in LogicalPlanVerifier.
+      if (!includeAsterisk) {
+
+        int projectColumnNum = projection.getNamedExprs().length;
+
+        if (expr.hasTargetColumns()) {
+          int targetColumnNum = expr.getTargetColumns().length;
 
-          TableDesc table = catalog.getTableDesc(qualifiedName);
-          if (table == null) {
-            context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
-            return null;
+          if (targetColumnNum > projectColumnNum) {
+            context.state.addVerification("INSERT has more target columns than expressions");
+          } else if (targetColumnNum < projectColumnNum) {
+            context.state.addVerification("INSERT has more expressions than target columns");
           }
-          if (table.hasPartition()) {
-            int columnSize = table.getSchema().getColumns().size();
-            columnSize += table.getPartitionMethod().getExpressionSchema().getColumns().size();
-            if (projectColumnNum < columnSize) {
-              context.state.addVerification("INSERT has smaller expressions than target columns");
-            } else if (projectColumnNum > columnSize) {
-              context.state.addVerification("INSERT has more expressions than target columns");
+        } else {
+          if (expr.hasTableName()) {
+            String qualifiedName = expr.getTableName();
+            if (TajoConstants.EMPTY_STRING.equals(CatalogUtil.extractQualifier(expr.getTableName()))) {
+              qualifiedName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE),
+                  expr.getTableName());
+            }
+
+            TableDesc table = catalog.getTableDesc(qualifiedName);
+            if (table == null) {
+              context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
+              return null;
+            }
+            if (table.hasPartition()) {
+              int columnSize = table.getSchema().getColumns().size();
+              columnSize += table.getPartitionMethod().getExpressionSchema().getColumns().size();
+              if (projectColumnNum < columnSize) {
+                context.state.addVerification("INSERT has smaller expressions than target columns");
+              } else if (projectColumnNum > columnSize) {
+                context.state.addVerification("INSERT has more expressions than target columns");
+              }
             }
           }
         }


[7/7] tajo git commit: Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5c11283a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5c11283a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5c11283a

Branch: refs/heads/index_support
Commit: 5c11283a6d892215b1abb8f30f81a75f7e529bf6
Parents: 708453c 9f8be1a
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 4 13:36:48 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 4 13:36:48 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  20 ++-
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  |  13 +-
 .../apache/tajo/storage/StorageConstants.java   |  14 +-
 .../planner/physical/PhysicalPlanUtil.java      |  65 ++++++--
 .../engine/planner/physical/SeqScanExec.java    |   7 +-
 .../tajo/worker/TajoWorkerClientService.java    |  16 +-
 .../tajo/engine/query/TestCaseByCases.java      |   8 +
 .../tajo/engine/query/TestInsertQuery.java      |  22 +++
 .../tajo/engine/query/TestTablePartitions.java  |  56 +++++++
 .../TestCaseByCases/testTAJO1224Case1.sql       |   1 +
 .../TestInsertQuery/lineitem_year_month_ddl.sql |  18 ++
 .../load_to_lineitem_year_month.sql             |   1 +
 .../testInsertOverwriteWithAsteriskAndMore.sql  |   1 +
 .../TestCaseByCases/testTAJO1224Case1.result    |   3 +
 ...estInsertOverwriteWithAsteriskAndMore.result |   7 +
 ...rtitionedTableWithSmallerExpressions5.result |   7 +
 ...rtitionedTableWithSmallerExpressions6.result |   4 +
 .../src/main/sphinx/table_management/csv.rst    |   5 +
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  13 +-
 .../org/apache/tajo/jdbc/TajoStatement.java     |   7 +-
 .../plan/verifier/PreLogicalPlanVerifier.java   |  64 +++++---
 .../apache/tajo/rpc/RemoteCallException.java    |   6 +-
 .../storage/FieldSerializerDeserializer.java    |   4 +-
 .../tajo/storage/json/JsonLineDeserializer.java |  13 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |   2 +-
 .../tajo/storage/text/DelimitedTextFile.java    |  89 +++++++---
 .../tajo/storage/text/TextLineDeserializer.java |   2 +-
 .../tajo/storage/text/TextLineParsingError.java |  31 ++++
 .../tajo/storage/TestDelimitedTextFile.java     | 164 +++++++++++++++++++
 .../testErrorTolerance1.json                    |   6 +
 .../testErrorTolerance2.json                    |   4 +
 31 files changed, 581 insertions(+), 92 deletions(-)
----------------------------------------------------------------------



[4/7] tajo git commit: TAJO-1220: Implement createStatement() and setEscapeProcessing() in JdbcConnection. (YeonSu Han via hyunsik)

Posted by ji...@apache.org.
TAJO-1220: Implement createStatement() and setEscapeProcessing() in  JdbcConnection. (YeonSu Han via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ea2dbf82
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ea2dbf82
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ea2dbf82

Branch: refs/heads/index_support
Commit: ea2dbf82c04d609dac46abd322241a15ccf30956
Parents: f69938a
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 16:04:56 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 16:04:56 2014 +0900

----------------------------------------------------------------------
 CHANGES                                                |  3 +++
 .../main/java/org/apache/tajo/jdbc/JdbcConnection.java | 13 ++++++++++---
 .../main/java/org/apache/tajo/jdbc/TajoStatement.java  |  7 ++++---
 3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ea2dbf82/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c26b8a9..ea8e1ca 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1220: Implement createStatement() and setEscapeProcessing() in 
+    JdbcConnection. (YeonSu Han via hyunsik)
+
     TAJO-1183: Keep command execution even with errors. (Jaewoong Jung via 
     hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ea2dbf82/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index 4250da4..a76443d 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -181,9 +181,16 @@ public class JdbcConnection implements Connection {
   }
 
   @Override
-  public Statement createStatement(int resultSetType, int resultSetConcurrency)
-      throws SQLException {
-    throw new SQLFeatureNotSupportedException("createStatement");
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) {
+      throw new SQLException("TYPE_SCROLL_SENSITIVE is not supported");
+    }
+
+    if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) {
+      throw new SQLException("CONCUR_READ_ONLY mode is not supported.");
+    }
+
+    return new TajoStatement(this, tajoClient);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/ea2dbf82/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index 8308211..eb7f8c9 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -311,10 +311,11 @@ public class TajoStatement implements Statement {
     throw new SQLFeatureNotSupportedException("setCursorName not supported");
   }
 
+  /**
+   * Not necessary.
+   */
   @Override
-  public void setEscapeProcessing(boolean enable) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported");
-  }
+  public void setEscapeProcessing(boolean enable) throws SQLException {}
 
   @Override
   public void setFetchDirection(int direction) throws SQLException {


[3/7] tajo git commit: TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.

Posted by ji...@apache.org.
TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.

Closes #277


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f69938ab
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f69938ab
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f69938ab

Branch: refs/heads/index_support
Commit: f69938abecd3d53968e318d97aba53d9acd3de40
Parents: 5066ac3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 15:44:05 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 15:44:05 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/storage/StorageConstants.java   |  14 +-
 .../src/main/sphinx/table_management/csv.rst    |   5 +
 .../storage/FieldSerializerDeserializer.java    |   4 +-
 .../tajo/storage/json/JsonLineDeserializer.java |  13 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |   2 +-
 .../tajo/storage/text/DelimitedTextFile.java    |  84 +++++++---
 .../tajo/storage/text/TextLineDeserializer.java |   2 +-
 .../tajo/storage/text/TextLineParsingError.java |  31 ++++
 .../tajo/storage/TestDelimitedTextFile.java     | 164 +++++++++++++++++++
 .../testErrorTolerance1.json                    |   6 +
 .../testErrorTolerance2.json                    |   4 +
 12 files changed, 303 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c03b72b..c26b8a9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,9 @@ Release 0.9.1 - unreleased
 
   NEW FEATURES
 
+    TAJO-1222: DelimitedTextFile should be tolerant against parsing errors.
+    (hyunsik)
+
     TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim)
 
     TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index a3d8de0..459c9c9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -32,8 +32,20 @@ public class StorageConstants {
 
   public static final String TEXT_DELIMITER = "text.delimiter";
   public static final String TEXT_NULL = "text.null";
-  public static final String TEXT_SERDE_CLASS = "text.serde.class";
+  public static final String TEXT_SERDE_CLASS = "text.serde";
   public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe";
+  /**
+   * It's the maximum number of parsing error torrence.
+   *
+   * <ul>
+   *   <li>If it is -1, it is always torrent against any parsing error.</li>
+   *   <li>If it is 0, it does not permit any parsing error.</li>
+   *   <li>If it is some positive integer (i.e., > 0), the given number of parsing errors in each
+   *       task will be permissible</li>
+   * </ul>
+   **/
+  public static final String TEXT_ERROR_TOLERANCE_MAXNUM = "text.error-tolerance.max-num";
+  public static final String DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM = "0";
 
   @Deprecated
   public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter";

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-docs/src/main/sphinx/table_management/csv.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/csv.rst b/tajo-docs/src/main/sphinx/table_management/csv.rst
index 3aba2ba..71313d6 100644
--- a/tajo-docs/src/main/sphinx/table_management/csv.rst
+++ b/tajo-docs/src/main/sphinx/table_management/csv.rst
@@ -40,6 +40,11 @@ Now, the CSV storage format provides the following physical properties.
 * ``text.null``: NULL character. The default NULL character is an empty string ``''``. Hive's default NULL character is ``'\\N'``.
 * ``compression.codec``: Compression codec. You can enable compression feature and set specified compression algorithm. The compression algorithm used to compress files. The compression codec name should be the fully qualified class name inherited from `org.apache.hadoop.io.compress.CompressionCodec <https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/compress/CompressionCodec.html>`_. By default, compression is disabled.
 * ``csvfile.serde``: custom (De)serializer class. ``org.apache.tajo.storage.TextSerializerDeserializer`` is the default (De)serializer class.
+* ``text.error-tolerance.max-num``: the maximum number of permissible parsing errors. This value should be an integer value. By default, ``text.error-tolerance.max-num`` is ``0``. According to the value, parsing errors will be handled in different ways.
+
+  * If ``text.error-tolerance.max-num < 0``, all parsing errors are ignored.
+  * If ``text.error-tolerance.max-num == 0``, any parsing error is not allowed. If any error occurs, the query will be failed. (default)
+  * If ``text.error-tolerance.max-num > 0``, the given number of parsing errors in each task will be pemissible.
 
 The following example is to set a custom field delimiter, NULL character, and compression codec:
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
index 7df4584..0b3755d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import io.netty.buffer.ByteBuf;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.text.TextLineParsingError;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -30,6 +31,7 @@ public interface FieldSerializerDeserializer {
 
   public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
 
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException;
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars)
+      throws IOException, TextLineParsingError;
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
index 37cd9f3..dfe36f6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
 import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
@@ -30,9 +31,11 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -55,7 +58,7 @@ public class JsonLineDeserializer extends TextLineDeserializer {
   }
 
   @Override
-  public void deserialize(ByteBuf buf, Tuple output) throws IOException {
+  public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError {
     byte [] line = new byte[buf.readableBytes()];
     buf.readBytes(line);
 
@@ -170,8 +173,9 @@ public class JsonLineDeserializer extends TextLineDeserializer {
           if (jsonObject == null) {
             output.put(actualIdx, NullDatum.get());
             break;
-          } if (jsonObject instanceof String) {
-            output.put(actualIdx, DatumFactory.createBlob((String)jsonObject));
+          }
+          if (jsonObject instanceof String) {
+            output.put(actualIdx, DatumFactory.createBlob((String) jsonObject));
           } else if (jsonObject instanceof JSONArray) {
             JSONArray jsonArray = (JSONArray) jsonObject;
             byte[] bytes = new byte[jsonArray.size()];
@@ -208,7 +212,8 @@ public class JsonLineDeserializer extends TextLineDeserializer {
           throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
         }
       }
-
+    } catch (ParseException pe) {
+      throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe);
     } catch (Throwable e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 0e2dfb0..f2eebc6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -48,7 +48,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     fieldSerDer = new TextFieldSerializerDeserializer();
   }
 
-  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException {
+  public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
     int[] projection = targetColumnIndexes;
     if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
       return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 2218fae..c54131b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -48,6 +48,9 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
+import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
+
 public class DelimitedTextFile {
 
   public static final byte LF = '\n';
@@ -267,12 +270,19 @@ public class DelimitedTextFile {
     private final long startOffset;
 
     private final long endOffset;
+    /** The number of actual read records */
     private int recordCount = 0;
     private int[] targetColumnIndexes;
 
     private DelimitedLineReader reader;
     private TextLineDeserializer deserializer;
 
+    private int errorPrintOutMaxNum = 5;
+    /** Maximum number of permissible errors */
+    private int errorTorrenceMaxNum;
+    /** How many errors have occurred? */
+    private int errorNum;
+
     public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
                                     final FileFragment fragment)
         throws IOException {
@@ -284,10 +294,9 @@ public class DelimitedTextFile {
 
       startOffset = fragment.getStartKey();
       endOffset = startOffset + fragment.getEndKey();
-    }
 
-    public TextLineSerDe getLineSerde() {
-      return DelimitedTextFile.getLineSerde(meta);
+      errorTorrenceMaxNum =
+          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
     }
 
     @Override
@@ -295,6 +304,7 @@ public class DelimitedTextFile {
       if (reader != null) {
         reader.close();
       }
+
       reader = new DelimitedLineReader(conf, fragment);
       reader.init();
       recordCount = 0;
@@ -322,15 +332,8 @@ public class DelimitedTextFile {
       deserializer.init();
     }
 
-    public ByteBuf readLine() throws IOException {
-      ByteBuf buf = reader.readLine();
-      if (buf == null) {
-        return null;
-      } else {
-        recordCount++;
-      }
-
-      return buf;
+    public TextLineSerDe getLineSerde() {
+      return DelimitedTextFile.getLineSerde(meta);
     }
 
     @Override
@@ -355,21 +358,60 @@ public class DelimitedTextFile {
 
     @Override
     public Tuple next() throws IOException {
+
+      if (!reader.isReadable()) {
+        return null;
+      }
+
+      if (targets.length == 0) {
+        return EmptyTuple.get();
+      }
+
+      VTuple tuple = new VTuple(schema.size());
+
       try {
-        if (!reader.isReadable()) return null;
 
-        ByteBuf buf = readLine();
-        if (buf == null) return null;
+        // this loop will continue until one tuple is build or EOS (end of stream).
+        do {
 
-        if (targets.length == 0) {
-          return EmptyTuple.get();
-        }
+          ByteBuf buf = reader.readLine();
+          if (buf == null) {
+            return null;
+          }
+
+          try {
+
+            deserializer.deserialize(buf, tuple);
+            // if a line is read normaly, it exists this loop.
+            break;
+
+          } catch (TextLineParsingError tae) {
+
+            errorNum++;
+
+            // suppress too many log prints, which probably cause performance degradation
+            if (errorNum < errorPrintOutMaxNum) {
+              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
+            }
+
+            // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0),
+            // it checks if the number of parsing error exceeds the max limit.
+            // Otherwise, it will ignore all parsing errors.
+            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
+              throw tae;
+            }
+            continue;
+          }
+
+        } while (reader.isReadable()); // continue until EOS
+
+        // recordCount means the number of actual read records. We increment the count here.
+        recordCount++;
 
-        VTuple tuple = new VTuple(schema.size());
-        deserializer.deserialize(buf, tuple);
         return tuple;
+
       } catch (Throwable t) {
-        LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t);
+        LOG.error(t);
         throw new IOException(t);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index b0d3c3a..7ebfa79 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -51,7 +51,7 @@ public abstract class TextLineDeserializer {
    * @param output Tuple to be filled with read fields
    * @throws java.io.IOException
    */
-  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException;
+  public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
 
   /**
    * Release external resources

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
new file mode 100644
index 0000000..f0bae5e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+public class TextLineParsingError extends Exception {
+
+  public TextLineParsingError(Throwable t) {
+    super(t);
+  }
+
+  public TextLineParsingError(String message, Throwable t) {
+    super(t.getMessage() + ", Error line: " + message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
new file mode 100644
index 0000000..93fb12b
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+public class TestDelimitedTextFile {
+
+  private static Schema schema = new Schema();
+
+  private static Tuple baseTuple = new VTuple(10);
+
+  static {
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+
+    baseTuple.put(new Datum[] {
+        DatumFactory.createBool(true),                // 0
+        DatumFactory.createChar("hyunsik"),           // 1
+        DatumFactory.createInt2((short) 17),          // 2
+        DatumFactory.createInt4(59),                  // 3
+        DatumFactory.createInt8(23l),                 // 4
+        DatumFactory.createFloat4(77.9f),             // 5
+        DatumFactory.createFloat8(271.9d),            // 6
+        DatumFactory.createText("hyunsik"),           // 7
+        DatumFactory.createBlob("hyunsik".getBytes()),// 8
+        DatumFactory.createInet4("192.168.0.1"),      // 9
+    });
+  }
+
+  public static Path getResourcePath(String path, String suffix) {
+    URL resultBaseURL = ClassLoader.getSystemResource(path);
+    return new Path(resultBaseURL.toString(), suffix);
+  }
+
+  public static Path getResultPath(Class clazz, String fileName) {
+    return new Path (getResourcePath("results", clazz.getSimpleName()), fileName);
+  }
+
+  public static String getResultText(Class clazz, String fileName) throws IOException {
+    FileSystem localFS = FileSystem.getLocal(new Configuration());
+    Path path = getResultPath(clazz, fileName);
+    Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
+    return FileUtil.readTextFile(new File(path.toUri()));
+  }
+
+  private static final FileFragment getFileFragment(String fileName) throws IOException {
+    TajoConf conf = new TajoConf();
+    Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName);
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(tablePath);
+    return new FileFragment("table", tablePath, 0, status.getLen());
+  }
+
+  @Test
+  public void testIgnoreAllErrors() throws IOException {
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
+    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    Tuple tuple;
+    int i = 0;
+    while ((tuple = scanner.next()) != null) {
+      assertEquals(baseTuple, tuple);
+      i++;
+    }
+    assertEquals(3, i);
+    scanner.close();
+  }
+
+  @Test
+  public void testIgnoreOneErrorTolerance() throws IOException {
+
+
+    TajoConf conf = new TajoConf();
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
+    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    assertNotNull(scanner.next());
+    assertNotNull(scanner.next());
+    try {
+      scanner.next();
+    } catch (IOException ioe) {
+      System.out.println(ioe);
+      return;
+    } finally {
+      scanner.close();
+    }
+    fail();
+  }
+
+  @Test
+  public void testNoErrorTolerance() throws IOException {
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
+    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
+    FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    scanner.init();
+
+    try {
+      scanner.next();
+    } catch (IOException ioe) {
+      return;
+    } finally {
+      scanner.close();
+    }
+    fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
new file mode 100644
index 0000000..739dfe7
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
@@ -0,0 +1,6 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/f69938ab/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
new file mode 100644
index 0000000..8256b72
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
@@ -0,0 +1,4 @@
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
+{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
\ No newline at end of file


[2/7] tajo git commit: TAJO-1183: Keep command execution even with errors. (Jaewoong Jung via hyunsik)

Posted by ji...@apache.org.
TAJO-1183: Keep command execution even with errors. (Jaewoong Jung via hyunsik)

Closes #266


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5066ac3e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5066ac3e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5066ac3e

Branch: refs/heads/index_support
Commit: 5066ac3e80da390ff0f5733d345cbc63ec252843
Parents: 1cdbe46
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 3 15:30:16 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Dec 3 15:32:16 2014 +0900

----------------------------------------------------------------------
 CHANGES                                                |  3 +++
 .../main/java/org/apache/tajo/cli/tsql/TajoCli.java    | 13 +++++++------
 2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5066ac3e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 188e024..c03b72b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -80,6 +80,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1183: Keep command execution even with errors. (Jaewoong Jung via 
+    hyunsik)
+
     TAJO-1190: INSERT INTO to partition tables may cause NPE. (hyunsik)
 
     TAJO-1211: Staging directory for CTAS and INSERT should be in 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5066ac3e/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index e96017b..d4886cf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -390,7 +390,7 @@ public class TajoCli {
   public int runShell() throws Exception {
     String line;
     String currentPrompt = context.getCurrentDatabase();
-    int exitCode = 0;
+    int exitCode;
 
     sout.write("Try \\? for help.\n");
 
@@ -416,7 +416,8 @@ public class TajoCli {
           exitCode = executeParsedResults(parsedResults);
           currentPrompt = updatePrompt(parser.getState());
 
-          if (exitCode != 0 && context.getBool(SessionVars.ON_ERROR_STOP)) {
+          // if at least one failed
+          if (exitCode != 0) {
             return exitCode;
           }
         }
@@ -430,11 +431,11 @@ public class TajoCli {
       
       throw e;
     }
-    return exitCode;
+    return 0;
   }
 
   private int executeParsedResults(Collection<ParsedResult> parsedResults) throws Exception {
-    int exitCode = 0;
+    int exitCode;
     for (ParsedResult parsedResult : parsedResults) {
       if (parsedResult.getType() == META) {
         exitCode = executeMetaCommand(parsedResult.getStatement());
@@ -442,12 +443,12 @@ public class TajoCli {
         exitCode = executeQuery(parsedResult.getStatement());
       }
 
-      if (exitCode != 0) {
+      if (exitCode != 0 && context.getBool(SessionVars.ON_ERROR_STOP)) {
         return exitCode;
       }
     }
 
-    return exitCode;
+    return 0;
   }
 
   public int executeMetaCommand(String line) throws Exception {


[6/7] tajo git commit: TAJO-1224: When there is no projected columns, json scan can be hang.

Posted by ji...@apache.org.
TAJO-1224: When there is no projected columns, json scan can be hang.

Closes #281


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9f8be1a6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9f8be1a6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9f8be1a6

Branch: refs/heads/index_support
Commit: 9f8be1a695298e2e9fe0d881ddfcb310b5a7460b
Parents: 20d1f01
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Dec 4 10:55:19 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Dec 4 11:28:44 2014 +0900

----------------------------------------------------------------------
 CHANGES                                          |  3 +++
 .../tajo/engine/query/TestCaseByCases.java       |  8 ++++++++
 .../TestCaseByCases/testTAJO1224Case1.sql        |  1 +
 .../TestCaseByCases/testTAJO1224Case1.result     |  3 +++
 .../tajo/storage/text/DelimitedTextFile.java     | 19 ++++++++++++-------
 5 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9f8be1a6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ea8e1ca..60aa3e0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1224: When there is no projected column, json scan can be hang. 
+    (hyunsik) 
+
     TAJO-1220: Implement createStatement() and setEscapeProcessing() in 
     JdbcConnection. (YeonSu Han via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9f8be1a6/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
index 846c290..bcf00f8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCaseByCases.java
@@ -172,4 +172,12 @@ public class TestCaseByCases extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testTAJO1224Case1() throws Exception {
+    executeString("CREATE TABLE TAJO1224 USING JSON AS SELECT * FROM LINEITEM").close();
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9f8be1a6/tajo-core/src/test/resources/queries/TestCaseByCases/testTAJO1224Case1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCaseByCases/testTAJO1224Case1.sql b/tajo-core/src/test/resources/queries/TestCaseByCases/testTAJO1224Case1.sql
new file mode 100644
index 0000000..d05a563
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestCaseByCases/testTAJO1224Case1.sql
@@ -0,0 +1 @@
+select count(*) from tajo1224;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9f8be1a6/tajo-core/src/test/resources/results/TestCaseByCases/testTAJO1224Case1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestCaseByCases/testTAJO1224Case1.result b/tajo-core/src/test/resources/results/TestCaseByCases/testTAJO1224Case1.result
new file mode 100644
index 0000000..19336a7
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestCaseByCases/testTAJO1224Case1.result
@@ -0,0 +1,3 @@
+?count
+-------------------------------
+5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9f8be1a6/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index c54131b..ab8a0b5 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -358,29 +358,34 @@ public class DelimitedTextFile {
 
     @Override
     public Tuple next() throws IOException {
+      VTuple tuple;
 
       if (!reader.isReadable()) {
         return null;
       }
 
-      if (targets.length == 0) {
-        return EmptyTuple.get();
-      }
-
-      VTuple tuple = new VTuple(schema.size());
-
       try {
 
         // this loop will continue until one tuple is build or EOS (end of stream).
         do {
 
           ByteBuf buf = reader.readLine();
+
+          // if no more line, then return EOT (end of tuple)
           if (buf == null) {
             return null;
           }
 
-          try {
+          // If there is no required column, we just read each line
+          // and then return an empty tuple without parsing line.
+          if (targets.length == 0) {
+            recordCount++;
+            return EmptyTuple.get();
+          }
 
+          tuple = new VTuple(schema.size());
+
+          try {
             deserializer.deserialize(buf, tuple);
             // if a line is read normaly, it exists this loop.
             break;