You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/10 03:02:37 UTC
tajo git commit: TAJO-1825: Remove zero length fragments when file
length is zero.
Repository: tajo
Updated Branches:
refs/heads/master bd90521ef -> 7f7f4d119
TAJO-1825: Remove zero length fragments when file length is zero.
Closes #744
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7f7f4d11
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7f7f4d11
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7f7f4d11
Branch: refs/heads/master
Commit: 7f7f4d1195ad0d0b60059d9729f318556ed66a10
Parents: bd90521
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Sep 10 10:01:51 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Sep 10 10:01:51 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/client/v2/LegacyClientDelegate.java | 6 ++
.../apache/tajo/querymaster/TestQueryState.java | 88 ++++++++++++++++++++
.../java/org/apache/tajo/querymaster/Stage.java | 8 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 4 +
.../org/apache/tajo/storage/FileTablespace.java | 3 -
.../apache/tajo/storage/TestFileTablespace.java | 49 +++++++++++
.../org/apache/tajo/storage/TestStorages.java | 44 ++++++++++
8 files changed, 199 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9030e77..9dc0ca5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -36,6 +36,8 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1825: Remove zero length fragments when file length is zero. (jinho)
+
TAJO-1828: tajo-daemon scripts should kill process after process can not
stop gracefully. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
index 42edadf..0a2c6de 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
@@ -66,6 +66,12 @@ public class LegacyClientDelegate extends SessionConnection implements ClientDel
}
@Override
+ public void close() {
+ executor.shutdown();
+ super.close();
+ }
+
+ @Override
public int executeUpdate(String sql) throws TajoException {
queryClient.updateQuery(sql);
return 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
index a43491b..5175186 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -20,11 +20,13 @@ package org.apache.tajo.querymaster;
import org.apache.tajo.*;
import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.util.history.StageHistory;
import org.junit.AfterClass;
@@ -32,6 +34,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.sql.ResultSet;
import java.util.List;
import static org.junit.Assert.*;
@@ -100,4 +103,89 @@ public class TestQueryState {
/* get status from TajoMaster */
assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState());
}
+
+ @Test(timeout = 10000)
+ public void testEmptyTable() throws Exception {
+ //create empty table
+ client.executeQueryAndGetResult("create table lineitem_empty as select * from lineitem where l_orderkey = -1");
+ TableStats stats = client.getTableDesc("lineitem_empty").getStats();
+ assertEquals(0L, stats.getNumBytes().longValue());
+ assertEquals(0L, stats.getNumRows().longValue());
+
+ String queryStr = "select count(*) from lineitem_empty";
+ /*
+ Optimized master plan
+ -------------------------------------------------------------------------------
+ Execution Block Graph (TERMINAL - eb_1441688509247_0002_000003)
+ -------------------------------------------------------------------------------
+ |-eb_1441688509247_0002_000003
+ |-eb_1441688509247_0002_000002
+ |-eb_1441688509247_0002_000001
+ -------------------------------------------------------------------------------
+ Order of Execution
+ -------------------------------------------------------------------------------
+ 1: eb_1441688509247_0002_000001
+ 2: eb_1441688509247_0002_000002
+ 3: eb_1441688509247_0002_000003
+ -------------------------------------------------------------------------------
+
+ =======================================================
+ Block Id: eb_1441688509247_0002_000001 [LEAF]
+ =======================================================
+
+ [Outgoing]
+ [q_1441688509247_0002] 1 => 2 (type=HASH_SHUFFLE, key=, num=1)
+
+ GROUP_BY(5)()
+ => exprs: (count())
+ => target list: ?count_1 (INT8)
+ => out schema:{(1) ?count_1 (INT8)}
+ => in schema:{(0) }
+ SCAN(0) on default.lineitem_empty
+ => target list:
+ => out schema: {(0) }
+
+ */
+
+ ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
+ QueryId queryId = new QueryId(res.getQueryId());
+
+ QueryStatus queryState = client.getQueryStatus(queryId);
+ while (!TajoClientUtil.isQueryComplete(queryState.getState())) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ fail("Query state : " + queryState);
+ }
+ queryState = client.getQueryStatus(queryId);
+ }
+
+ ResultSet resultSet = client.getQueryResult(queryId);
+ assertTrue(resultSet.next());
+ assertEquals(0, resultSet.getLong(1));
+
+ QueryInfo queryInfo = cluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId);
+ assertEquals(queryId, queryInfo.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, queryInfo.getQueryState());
+
+ QueryHistory history = cluster.getQueryHistory(queryId);
+ List<StageHistory> stages = history.getStageHistories();
+
+ assertFalse(stages.isEmpty());
+
+ for (StageHistory stage : stages) {
+ ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(stage.getExecutionBlockId());
+ //find leaf stage
+ if (executionBlockId.getId() == 1) {
+ assertEquals(0, stage.getTotalScheduledObjectsCount());
+ } else {
+ assertNotEquals(0, stage.getTotalScheduledObjectsCount());
+ }
+ assertEquals(StageState.SUCCEEDED.toString(), stage.getState());
+ }
+
+ /* get status from TajoMaster */
+ assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState());
+ client.executeQueryAndGetResult("drop table lineitem_empty purge");
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 0276cc2..1f98457 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -141,6 +141,10 @@ public class Stage implements EventHandler<StageEvent> {
.addTransition(StageState.INITED, StageState.INITED,
StageEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.INITED,
+ EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+ StageEventType.SQ_STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
.addTransition(StageState.INITED, StageState.KILL_WAIT,
StageEventType.SQ_KILL, new KillTasksTransition())
.addTransition(StageState.INITED, StageState.ERROR,
@@ -845,8 +849,8 @@ public class Stage implements EventHandler<StageEvent> {
LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
- stage.finalizeStage();
- stage.complete();
+ stage.eventHandler.handle(
+ new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
} else {
if(stage.getSynchronizedState() == StageState.INITED) {
stage.taskScheduler.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 1d93c3c..919b2c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -287,7 +287,11 @@ public class JSPUtil {
}
static final DecimalFormat PERCENT_FORMAT = new DecimalFormat("###.#");
+
public static String percentFormat(float value) {
+ if (Float.isInfinite(value) || Float.isNaN(value)) {
+ value = 0.0f;
+ }
return PERCENT_FORMAT.format(value * 100.0f);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index f79471a..f4c78b6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -556,9 +556,6 @@ public class FileTablespace extends Tablespace {
splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
}
}
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, path, 0, length));
}
}
if(LOG.isDebugEnabled()){
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index 37fbfe4..cecd4dd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -158,6 +158,55 @@ public class TestFileTablespace {
}
@Test
+ public void testZeroLengthSplit() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ cluster.waitClusterUp();
+ TajoConf tajoConf = new TajoConf(conf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+ int testCount = 10;
+ Path tablePath = new Path("/testZeroLengthSplit");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test partitions
+ List<Path> partitions = Lists.newArrayList();
+ for (int i =0; i < testCount; i++){
+ Path tmpFile = new Path(tablePath, String.valueOf(i));
+
+ //creates zero length file
+ DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 0, (short) 2, 0xDEADDEADl);
+ partitions.add(tmpFile);
+ }
+
+ assertTrue(fs.exists(tablePath));
+ FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri());
+ space.init(new TajoConf(conf));
+ assertEquals(fs.getUri(), space.getUri());
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta("TEXT");
+
+ List<Fragment> splits = Lists.newArrayList();
+ // Get FileFragments in partition batch
+ splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ assertEquals(0, splits.size());
+ fs.close();
+ } finally {
+ cluster.shutdown(true);
+ }
+ }
+
+ @Test
public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7f7f4d11/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 1051b3b..66d86f2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -40,6 +40,7 @@ import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.RCFile;
import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
import org.apache.tajo.util.CommonTestingUtil;
@@ -205,6 +206,49 @@ public class TestStorages {
}
@Test
+ public void testZeroRows() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.setOptions(CatalogUtil.newDefaultProperty(storeType));
+ if (storeType.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_PROJECTION_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testZeroRows.data");
+ FileTablespace sm = TablespaceManager.getLocalFs();
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ appender.close();
+
+ TableStats stat = appender.getStats();
+ assertEquals(0, stat.getNumRows().longValue());
+
+ if(internalType || BuiltinStorages.TEXT.equals(storeType)) {
+ FileStatus fileStatus = fs.getFileStatus(tablePath);
+ assertEquals(0, fileStatus.getLen());
+ }
+
+ List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, testDir);
+ int tupleCnt = 0;
+ for (Fragment fragment : splits) {
+ Scanner scanner = sm.getScanner(meta, schema, fragment, schema);
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+ }
+
+ assertEquals(0, tupleCnt);
+ }
+
+ @Test
public void testRCFileSplitable() throws IOException {
if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
Schema schema = new Schema();