You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/10 03:02:37 UTC

tajo git commit: TAJO-1825: Remove zero length fragments when file length is zero.

Repository: tajo
Updated Branches:
  refs/heads/master bd90521ef -> 7f7f4d119


TAJO-1825: Remove zero length fragments when file length is zero.

Closes #744


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7f7f4d11
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7f7f4d11
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7f7f4d11

Branch: refs/heads/master
Commit: 7f7f4d1195ad0d0b60059d9729f318556ed66a10
Parents: bd90521
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Sep 10 10:01:51 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Sep 10 10:01:51 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/client/v2/LegacyClientDelegate.java    |  6 ++
 .../apache/tajo/querymaster/TestQueryState.java | 88 ++++++++++++++++++++
 .../java/org/apache/tajo/querymaster/Stage.java |  8 +-
 .../main/java/org/apache/tajo/util/JSPUtil.java |  4 +
 .../org/apache/tajo/storage/FileTablespace.java |  3 -
 .../apache/tajo/storage/TestFileTablespace.java | 49 +++++++++++
 .../org/apache/tajo/storage/TestStorages.java   | 44 ++++++++++
 8 files changed, 199 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9030e77..9dc0ca5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -36,6 +36,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1825: Remove zero length fragments when file length is zero. (jinho)
+
     TAJO-1828: tajo-daemon scripts should kill process after process can not 
     stop gracefully. (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
index 42edadf..0a2c6de 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
@@ -66,6 +66,12 @@ public class LegacyClientDelegate extends SessionConnection implements ClientDel
   }
 
   @Override
+  public void close() {
+    executor.shutdown();
+    super.close();
+  }
+
+  @Override
   public int executeUpdate(String sql) throws TajoException {
     queryClient.updateQuery(sql);
     return 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
index a43491b..5175186 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -20,11 +20,13 @@ package org.apache.tajo.querymaster;
 
 import org.apache.tajo.*;
 import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.util.history.StageHistory;
 import org.junit.AfterClass;
@@ -32,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.sql.ResultSet;
 import java.util.List;
 
 import static org.junit.Assert.*;
@@ -100,4 +103,89 @@ public class TestQueryState {
     /* get status from TajoMaster */
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState());
   }
+
+  @Test(timeout = 10000)
+  public void testEmptyTable() throws Exception {
+    //create empty table
+    client.executeQueryAndGetResult("create table lineitem_empty as select * from lineitem where l_orderkey = -1");
+    TableStats stats = client.getTableDesc("lineitem_empty").getStats();
+    assertEquals(0L, stats.getNumBytes().longValue());
+    assertEquals(0L, stats.getNumRows().longValue());
+
+        String queryStr = "select count(*) from lineitem_empty";
+    /*
+    Optimized master plan
+      -------------------------------------------------------------------------------
+      Execution Block Graph (TERMINAL - eb_1441688509247_0002_000003)
+      -------------------------------------------------------------------------------
+      |-eb_1441688509247_0002_000003
+         |-eb_1441688509247_0002_000002
+            |-eb_1441688509247_0002_000001
+      -------------------------------------------------------------------------------
+      Order of Execution
+      -------------------------------------------------------------------------------
+      1: eb_1441688509247_0002_000001
+      2: eb_1441688509247_0002_000002
+      3: eb_1441688509247_0002_000003
+      -------------------------------------------------------------------------------
+
+      =======================================================
+      Block Id: eb_1441688509247_0002_000001 [LEAF]
+      =======================================================
+
+      [Outgoing]
+      [q_1441688509247_0002] 1 => 2 (type=HASH_SHUFFLE, key=, num=1)
+
+      GROUP_BY(5)()
+        => exprs: (count())
+        => target list: ?count_1 (INT8)
+        => out schema:{(1) ?count_1 (INT8)}
+        => in schema:{(0) }
+         SCAN(0) on default.lineitem_empty
+           => target list:
+           => out schema: {(0) }
+
+    */
+
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
+    QueryId queryId = new QueryId(res.getQueryId());
+
+    QueryStatus queryState = client.getQueryStatus(queryId);
+    while (!TajoClientUtil.isQueryComplete(queryState.getState())) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        fail("Query state : " + queryState);
+      }
+      queryState = client.getQueryStatus(queryId);
+    }
+
+    ResultSet resultSet = client.getQueryResult(queryId);
+    assertTrue(resultSet.next());
+    assertEquals(0, resultSet.getLong(1));
+
+    QueryInfo queryInfo = cluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId);
+    assertEquals(queryId, queryInfo.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, queryInfo.getQueryState());
+
+    QueryHistory history = cluster.getQueryHistory(queryId);
+    List<StageHistory> stages = history.getStageHistories();
+
+    assertFalse(stages.isEmpty());
+
+    for (StageHistory stage : stages) {
+      ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(stage.getExecutionBlockId());
+      //find leaf stage
+      if (executionBlockId.getId() == 1) {
+        assertEquals(0, stage.getTotalScheduledObjectsCount());
+      } else {
+        assertNotEquals(0, stage.getTotalScheduledObjectsCount());
+      }
+      assertEquals(StageState.SUCCEEDED.toString(), stage.getState());
+    }
+
+    /* get status from TajoMaster */
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState());
+    client.executeQueryAndGetResult("drop table lineitem_empty purge");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 0276cc2..1f98457 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -141,6 +141,10 @@ public class Stage implements EventHandler<StageEvent> {
           .addTransition(StageState.INITED, StageState.INITED,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.INITED,
+              EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+              StageEventType.SQ_STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
           .addTransition(StageState.INITED, StageState.KILL_WAIT,
               StageEventType.SQ_KILL, new KillTasksTransition())
           .addTransition(StageState.INITED, StageState.ERROR,
@@ -845,8 +849,8 @@ public class Stage implements EventHandler<StageEvent> {
                             LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
 
                             if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
-                              stage.finalizeStage();
-                              stage.complete();
+                              stage.eventHandler.handle(
+                                  new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
                             } else {
                               if(stage.getSynchronizedState() == StageState.INITED) {
                                 stage.taskScheduler.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 1d93c3c..919b2c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -287,7 +287,11 @@ public class JSPUtil {
   }
 
   static final DecimalFormat PERCENT_FORMAT = new DecimalFormat("###.#");
+
   public static String percentFormat(float value) {
+    if (Float.isInfinite(value) || Float.isNaN(value)) {
+      value = 0.0f;
+    }
     return PERCENT_FORMAT.format(value * 100.0f);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index f79471a..f4c78b6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -556,9 +556,6 @@ public class FileTablespace extends Tablespace {
               splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
             }
           }
-        } else {
-          //for zero length files
-          splits.add(makeSplit(tableName, path, 0, length));
         }
       }
       if(LOG.isDebugEnabled()){

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index 37fbfe4..cecd4dd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -158,6 +158,55 @@ public class TestFileTablespace {
   }
 
   @Test
+  public void testZeroLengthSplit() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitClusterUp();
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+    int testCount = 10;
+    Path tablePath = new Path("/testZeroLengthSplit");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create test partitions
+      List<Path> partitions = Lists.newArrayList();
+      for (int i =0; i < testCount; i++){
+        Path tmpFile = new Path(tablePath, String.valueOf(i));
+
+        //creates zero length file
+        DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 0, (short) 2, 0xDEADDEADl);
+        partitions.add(tmpFile);
+      }
+
+      assertTrue(fs.exists(tablePath));
+      FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri());
+      space.init(new TajoConf(conf));
+      assertEquals(fs.getUri(), space.getUri());
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age",Type.INT4);
+      schema.addColumn("name",Type.TEXT);
+      TableMeta meta = CatalogUtil.newTableMeta("TEXT");
+
+      List<Fragment> splits = Lists.newArrayList();
+      // Get FileFragments in partition batch
+      splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+      assertEquals(0, splits.size());
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+
+  @Test
   public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 1051b3b..66d86f2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -40,6 +40,7 @@ import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.RCFile;
 import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -205,6 +206,49 @@ public class TestStorages {
   }
 
   @Test
+  public void testZeroRows() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
+    if (storeType.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+          TEST_PROJECTION_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testZeroRows.data");
+    FileTablespace sm = TablespaceManager.getLocalFs();
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.enableStats();
+    appender.init();
+    appender.close();
+
+    TableStats stat = appender.getStats();
+    assertEquals(0, stat.getNumRows().longValue());
+
+    if(internalType || BuiltinStorages.TEXT.equals(storeType)) {
+      FileStatus fileStatus = fs.getFileStatus(tablePath);
+      assertEquals(0, fileStatus.getLen());
+    }
+
+    List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, testDir);
+    int tupleCnt = 0;
+    for (Fragment fragment : splits) {
+      Scanner scanner = sm.getScanner(meta, schema, fragment, schema);
+      scanner.init();
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+    }
+
+    assertEquals(0, tupleCnt);
+  }
+
+  @Test
   public void testRCFileSplitable() throws IOException {
     if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
       Schema schema = new Schema();