You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/05 07:15:56 UTC
[04/10] tajo git commit: TAJO-1362: Resolve findbug warnings on Tajo
Core Module
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index fd52488..0d1924b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -129,6 +129,9 @@ public class QueryMasterTask extends CompositeService {
@Override
public void init(Configuration conf) {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("conf should be a TajoConf type.");
+ }
systemConf = (TajoConf)conf;
try {
@@ -377,8 +380,7 @@ public class QueryMasterTask extends CompositeService {
}
private void initStagingDir() throws IOException {
- Path stagingDir = null;
- FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
+ Path stagingDir;
try {
@@ -388,14 +390,7 @@ public class QueryMasterTask extends CompositeService {
LOG.info("The staging dir '" + stagingDir + "' is created.");
queryContext.setStagingDir(stagingDir);
} catch (IOException ioe) {
- if (stagingDir != null && defaultFS.exists(stagingDir)) {
- try {
- defaultFS.delete(stagingDir, true);
- LOG.info("The staging directory '" + stagingDir + "' is deleted");
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- }
+ LOG.warn("Creating staging dir has been failed.", ioe);
throw ioe;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 7c2d9f4..8e9e343 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -82,7 +82,6 @@ public class Repartitioner {
public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
throws IOException {
- MasterPlan masterPlan = stage.getMasterPlan();
ExecutionBlock execBlock = stage.getBlock();
QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
@@ -231,12 +230,13 @@ public class Repartitioner {
String namePrefix = "";
long maxStats = Long.MIN_VALUE;
int maxStatsScanIdx = -1;
+ StringBuilder nonLeafScanNamesBuilder = new StringBuilder();
for (int i = 0; i < scans.length; i++) {
if (scans[i].getTableDesc().getMeta().getStoreType() == StoreType.RAW) {
// Intermediate data scan
hasNonLeafNode = true;
largeScanIndexList.add(i);
- nonLeafScanNames += namePrefix + scans[i].getCanonicalName();
+ nonLeafScanNamesBuilder.append(namePrefix).append(scans[i].getCanonicalName());
namePrefix = ",";
}
if (execBlock.isBroadcastTable(scans[i].getCanonicalName())) {
@@ -249,18 +249,19 @@ public class Repartitioner {
}
}
}
+ nonLeafScanNames = nonLeafScanNamesBuilder.toString();
if (maxStatsScanIdx == -1) {
maxStatsScanIdx = 0;
}
if (!hasNonLeafNode) {
if (largeScanIndexList.size() > 1) {
- String largeTableNames = "";
+ StringBuilder largeTableNamesBuilder = new StringBuilder();
for (Integer eachId : largeScanIndexList) {
- largeTableNames += scans[eachId].getTableName() + ",";
+ largeTableNamesBuilder.append(scans[eachId].getTableName()).append(',');
}
throw new IOException("Broadcast join with leaf node should have only one large table, " +
- "but " + largeScanIndexList.size() + ", tables=" + largeTableNames);
+ "but " + largeScanIndexList.size() + ", tables=" + largeTableNamesBuilder.toString());
}
int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0);
LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d",
@@ -494,6 +495,9 @@ public class Repartitioner {
public static List<Fragment> getFragmentsFromPartitionedTable(FileStorageManager sm,
ScanNode scan,
TableDesc table) throws IOException {
+ if (!(scan instanceof PartitionedTableScanNode)) {
+ throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
+ }
List<Fragment> fragments = Lists.newArrayList();
PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
fragments.addAll(sm.getSplits(
@@ -696,9 +700,14 @@ public class Repartitioner {
LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
" sub ranges (total units: " + determinedTaskNum + ")");
ranges = partitioner.partition(determinedTaskNum);
- if (ranges == null || ranges.length == 0) {
+ if (ranges == null) {
+ throw new NullPointerException("ranges is null on " + stage.getId() + " stage.");
+ }
+
+ if (ranges.length == 0) {
LOG.warn(stage.getId() + " no range infos.");
}
+
TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
if (LOG.isDebugEnabled()) {
if (ranges != null) {
@@ -985,7 +994,7 @@ public class Repartitioner {
String tableName) {
long splitVolume = StorageUnit.MB *
stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
- long pageSize = StorageUnit.MB *
+ long pageSize = ((long)StorageUnit.MB) *
stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
if (pageSize >= splitVolume) {
throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index efadaa7..4e1f716 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -865,7 +865,6 @@ public class Stage implements EventHandler<StageEvent> {
* @return
*/
public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
- TajoConf conf = stage.context.getConf();
MasterPlan masterPlan = stage.getMasterPlan();
ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
@@ -1156,6 +1155,9 @@ public class Stage implements EventHandler<StageEvent> {
@Override
public void transition(Stage stage, StageEvent event) {
+ if (!(event instanceof StageContainerAllocationEvent)) {
+ throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type.");
+ }
try {
StageContainerAllocationEvent allocationEvent =
(StageContainerAllocationEvent) event;
@@ -1191,6 +1193,9 @@ public class Stage implements EventHandler<StageEvent> {
private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage, StageEvent event) {
+ if (!(event instanceof StageContainerAllocationEvent)) {
+ throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type.");
+ }
try {
StageContainerAllocationEvent allocationEvent =
(StageContainerAllocationEvent) event;
@@ -1213,6 +1218,9 @@ public class Stage implements EventHandler<StageEvent> {
@Override
public void transition(Stage stage,
StageEvent event) {
+ if (!(event instanceof StageTaskEvent)) {
+ throw new IllegalArgumentException("event should be a StageTaskEvent type.");
+ }
StageTaskEvent taskEvent = (StageTaskEvent) event;
Task task = stage.getTask(taskEvent.getTaskId());
@@ -1418,6 +1426,9 @@ public class Stage implements EventHandler<StageEvent> {
private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> {
@Override
public void transition(Stage stage, StageEvent event) {
+ if (!(event instanceof StageDiagnosticsUpdateEvent)) {
+ throw new IllegalArgumentException("event should be a StageDiagnosticsUpdateEvent type.");
+ }
stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index ad01b62..92f4b20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -601,6 +601,9 @@ public class Task implements EventHandler<TaskEvent> {
@Override
public void transition(Task task,
TaskEvent event) {
+ if (!(event instanceof TaskTAttemptEvent)) {
+ throw new IllegalArgumentException("event should be a TaskTAttemptEvent type.");
+ }
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
@@ -618,6 +621,9 @@ public class Task implements EventHandler<TaskEvent> {
@Override
public void transition(Task task,
TaskEvent event) {
+ if (!(event instanceof TaskTAttemptEvent)) {
+ throw new IllegalArgumentException("event should be a TaskTAttemptEvent type.");
+ }
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.launchTime = System.currentTimeMillis();
@@ -629,6 +635,9 @@ public class Task implements EventHandler<TaskEvent> {
private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> {
@Override
public void transition(Task task, TaskEvent event) {
+ if (!(event instanceof TaskTAttemptEvent)) {
+ throw new IllegalArgumentException("event should be a TaskTAttemptEvent type.");
+ }
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
LOG.info("=============================================================");
LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
@@ -646,6 +655,9 @@ public class Task implements EventHandler<TaskEvent> {
@Override
public TaskState transition(Task task, TaskEvent taskEvent) {
+ if (!(taskEvent instanceof TaskTAttemptEvent)) {
+ throw new IllegalArgumentException("taskEvent should be a TaskTAttemptEvent type.");
+ }
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
task.failedAttempts++;
task.finishedAttempts++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index 86c49b4..c1b9273 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -307,6 +307,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
+ if (!(event instanceof TaskAttemptAssignedEvent)) {
+ throw new IllegalArgumentException("event should be a TaskAttemptAssignedEvent type.");
+ }
TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
taskAttempt.containerId = castEvent.getContainerId();
taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
@@ -333,6 +336,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public TaskAttemptState transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
+ if (!(event instanceof TaskAttemptStatusUpdateEvent)) {
+ throw new IllegalArgumentException("event should be a TaskAttemptStatusUpdateEvent type.");
+ }
TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
taskAttempt.progress = updateEvent.getStatus().getProgress();
@@ -371,6 +377,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(TaskAttempt taskAttempt,
TaskAttemptEvent event) {
+ if (!(event instanceof TaskCompletionEvent)) {
+ throw new IllegalArgumentException("event should be a TaskCompletionEvent type.");
+ }
TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
try {
@@ -395,6 +404,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
@Override
public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+ if (!(event instanceof TaskFatalErrorEvent)) {
+ throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type.");
+ }
TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
index 3147bb6..23d245b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -143,6 +143,9 @@ public class IndexUtil {
&& binaryEval.getRightExpr().getType() == EvalType.CONST) {
nodeList.add(binaryEval);
}
+ break;
+ default:
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 13f4dcc..875d12b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -97,6 +97,9 @@ public class JSPUtil {
}
public static String getTajoMasterHttpAddr(Configuration config) {
+ if (!(config instanceof TajoConf)) {
+ throw new IllegalArgumentException("config should be a TajoConf type.");
+ }
try {
TajoConf conf = (TajoConf) config;
String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":");
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
index 868dfcd..6ba74d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java
@@ -127,6 +127,8 @@ public class HistoryCleaner extends Thread {
}
}
}
+ } catch (RuntimeException e) {
+ LOG.error(e.getMessage(), e);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 3fea3ef..b06c7e8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -84,6 +84,9 @@ public class HistoryWriter extends AbstractService {
@Override
public void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("conf should be a TajoConf type.");
+ }
tajoConf = (TajoConf)conf;
historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
index 9e895b8..386fb79 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -21,9 +21,7 @@ package org.apache.tajo.util.metrics.reporter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
+import java.io.*;
public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter {
private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class);
@@ -40,6 +38,7 @@ public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter
LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties");
return;
}
+
try {
File file = new File(fileName);
File parentFile = file.getParentFile();
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
index 4fbefd7..7f33792 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
@@ -36,7 +36,6 @@ public abstract class MetricsStreamScheduledReporter extends TajoMetricsSchedule
protected Locale locale;
protected Clock clock;
protected TimeZone timeZone;
- protected MetricFilter filter;
protected DateFormat dateFormat;
private final byte[] NEW_LINE = "\n".getBytes();
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
index a32a913..a7e0559 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
@@ -48,10 +48,12 @@ public abstract class TajoMetricsReporter {
groupName = keyTokens[0] + "." + keyTokens[1];
itemName = "";
String prefix = "";
+ StringBuilder itemNameBuilder = new StringBuilder();
for (int i = 2; i < keyTokens.length; i++) {
- itemName += prefix + keyTokens[i];
+ itemNameBuilder.append(prefix).append(keyTokens[i]);
prefix = ".";
}
+ itemName = itemNameBuilder.toString();
} else {
groupName = "";
itemName = key;
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
index f11d520..7e0ec4a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -142,11 +142,7 @@ public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter i
try {
report();
} catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.warn("Metric report error:" + e.getMessage(), e);
- } else {
- LOG.warn("Metric report error:" + e.getMessage(), e);
- }
+ LOG.warn("Metric report error:" + e.getMessage(), e);
}
}
}, period, period, unit);
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index d8bb8f1..e9b6230 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -28,6 +28,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
+import java.io.NotSerializableException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -62,17 +63,26 @@ import java.util.concurrent.atomic.AtomicInteger;
public class QueryExecutorServlet extends HttpServlet {
private static final Log LOG = LogFactory.getLog(QueryExecutorServlet.class);
+ private static final long serialVersionUID = -1517586415463171579L;
- ObjectMapper om = new ObjectMapper();
+ transient ObjectMapper om = new ObjectMapper();
//queryRunnerId -> QueryRunner
//TODO We must handle the session.
- private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
+ private transient final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
- private TajoConf tajoConf;
- private TajoClient tajoClient;
+ private transient TajoConf tajoConf;
+ private transient TajoClient tajoClient;
- private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
+ private transient ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
+
+ private void writeObject(java.io.ObjectOutputStream stream) throws java.io.IOException {
+ throw new NotSerializableException( getClass().getName() );
+ }
+
+ private void readObject(java.io.ObjectInputStream stream) throws java.io.IOException, ClassNotFoundException {
+ throw new NotSerializableException( getClass().getName() );
+ }
@Override
public void init(ServletConfig config) throws ServletException {
@@ -135,10 +145,11 @@ public class QueryExecutorServlet extends HttpServlet {
if(!queryRunners.containsKey(queryRunnerId)) {
break;
}
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
}
}
String database = request.getParameter("database");
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
index 09426e0..6008aae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -31,6 +31,8 @@ import java.net.Inet4Address;
public class StaticHttpServer extends HttpServer {
private static StaticHttpServer instance = null;
+ private static final Object lockObjectForStaticHttpServer = new Object();
+
private StaticHttpServer(Object containerObject , String name, String bindAddress, int port,
boolean findPort, Connector connector, Configuration conf,
String[] pathSpecs) throws IOException {
@@ -52,13 +54,17 @@ public class StaticHttpServer extends HttpServer {
addr = Inet4Address.getLocalHost().getHostName();
}
}
-
- instance = new StaticHttpServer(containerObject, name, addr, port,
- findPort, connector, conf, pathSpecs);
- instance.setAttribute("tajo.info.server.object", containerObject);
- instance.setAttribute("tajo.info.server.addr", addr);
- instance.setAttribute("tajo.info.server.conf", conf);
- instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis());
+
+ synchronized (lockObjectForStaticHttpServer) {
+ if (instance == null) {
+ instance = new StaticHttpServer(containerObject, name, addr, port,
+ findPort, connector, conf, pathSpecs);
+ instance.setAttribute("tajo.info.server.object", containerObject);
+ instance.setAttribute("tajo.info.server.addr", addr);
+ instance.setAttribute("tajo.info.server.conf", conf);
+ instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis());
+ }
+ }
}
return instance;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index 4b5a203..67114a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ExecutionBlockSharedResource {
private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
private AtomicBoolean initializing = new AtomicBoolean(false);
- private volatile Boolean resourceInitSuccess = new Boolean(false);
+ private volatile Boolean resourceInitSuccess = Boolean.valueOf(false);
// Query
private QueryContext context;
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c6a06f0..827c860 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -106,6 +106,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void init(Configuration conf) {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("conf should be a TajoConf type.");
+ }
tajoConf = (TajoConf)conf;
queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
@@ -147,6 +150,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void handle(TaskRunnerGroupEvent event) {
if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
+ if (!(event instanceof LaunchTaskRunnersEvent)) {
+ throw new IllegalArgumentException("event should be a LaunchTaskRunnersEvent type.");
+ }
LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event;
launchTaskRunners(launchEvent);
} else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 7f73916..7e2a233 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -186,6 +186,9 @@ public class TajoWorker extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("conf should be a TajoConf type.");
+ }
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
this.systemConf = (TajoConf)conf;
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 23efffa..cf50767 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -113,6 +113,9 @@ public class TaskRunner extends AbstractService {
@Override
public void init(Configuration conf) {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("conf should be a TajoConf Type.");
+ }
this.systemConf = (TajoConf)conf;
try {
@@ -143,15 +146,17 @@ public class TaskRunner extends AbstractService {
this.finishTime = System.currentTimeMillis();
this.history.setFinishTime(finishTime);
// If this flag become true, taskLauncher will be terminated.
- this.stopped = true;
-
- fetchLauncher.shutdown();
- fetchLauncher = null;
LOG.info("Stop TaskRunner: " + getId());
synchronized (this) {
+ this.stopped = true;
+
+ fetchLauncher.shutdown();
+ fetchLauncher = null;
+
notifyAll();
}
+
super.stop();
this.history.setState(getServiceState());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 57ae566..570bd38 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -230,7 +230,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
break;
}
try {
- long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
+ long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l;
cleanExpiredFinishedQueryMasterTask(expireTime);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 870e9a0..462f95d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -111,7 +111,6 @@ public class WorkerHeartbeatService extends AbstractService {
int workerCpuCoreNum;
boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
- int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
try {
diskDeviceInfos = DiskUtil.getDiskDeviceInfos();