You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/19 07:03:54 UTC
[iotdb] 01/02: fix some issue....
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ac9044586490419b4be3a33857acac0507259b6
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 18 22:38:34 2022 +0800
fix some issue....
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/mpp/buffer/DataBlockManager.java | 4 ++--
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 14 ++++++++------
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 22 +++++++++++++++-------
.../apache/iotdb/db/mpp/execution/Coordinator.java | 8 ++++++--
.../iotdb/db/mpp/execution/QueryExecution.java | 20 +++++++++++++++++++-
.../mpp/execution/scheduler/ClusterScheduler.java | 7 +++++++
.../scheduler/SimpleFragInstanceDispatcher.java | 6 +++++-
.../db/mpp/operator/source/SeriesScanOperator.java | 3 ++-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 2 +-
.../mpp/sql/planner/plan/DistributedQueryPlan.java | 11 +++++++++++
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 7 ++++++-
12 files changed, 83 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9540120af5..9a29c86d40 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -497,7 +497,7 @@ public class IoTDBConfig {
private String rpcImplClassName = TSServiceImpl.class.getName();
/** indicate whether current mode is mpp */
- private boolean mppMode = false;
+ private boolean mppMode = true;
/** Replace implementation class of influxdb protocol service */
private String influxdbImplClassName = InfluxDBServiceImpl.class.getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index a5278bdfd4..c6c642b1ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -264,7 +264,7 @@ public class DataBlockManager implements IDataBlockManager {
throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
}
- logger.info(
+ logger.debug(
"Create sink handle to plan node {} of {} for {}",
remotePlanNodeId,
remoteFragmentInstanceId,
@@ -301,7 +301,7 @@ public class DataBlockManager implements IDataBlockManager {
+ " exists.");
}
- logger.info(
+ logger.debug(
"Create source handle from {} for plan node {} of {}",
remoteFragmentInstanceId,
localPlanNodeId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 0beab26e36..b0204bd80e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -160,7 +160,7 @@ public class SinkHandle implements ISinkHandle {
}
private void sendEndOfDataBlockEvent() throws TException {
- logger.debug(
+ logger.info(
"Send end of data block event to plan node {} of {}. {}",
remotePlanNodeId,
remoteFragmentInstanceId,
@@ -220,10 +220,12 @@ public class SinkHandle implements ISinkHandle {
synchronized (this) {
sequenceIdToTsBlock.clear();
closed = true;
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
- bufferRetainedSizeInBytes = 0;
+ if (bufferRetainedSizeInBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
}
sinkHandleListener.onAborted(this);
logger.info("Sink handle {} is aborted", this);
@@ -334,7 +336,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public void run() {
- logger.debug(
+ logger.info(
"Send new data block event [{}, {}) to plan node {} of {}.",
startSequenceId,
startSequenceId + blockSizes.size(),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 802634efa7..d1f1df86d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -120,6 +120,7 @@ public class SourceHandle implements ISourceHandle {
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
blocked = SettableFuture.create();
+ logger.info("[SourceHandle {}]: blocked is set to new Future.", localPlanNodeId);
}
}
if (isFinished()) {
@@ -210,6 +211,7 @@ public class SourceHandle implements ISourceHandle {
}
synchronized void setNoMoreTsBlocks(int lastSequenceId) {
+ logger.info("[SourceHandle {}-{}]: No more TsBlock. {} ", localFragmentInstanceId, localPlanNodeId, remoteFragmentInstanceId);
this.lastSequenceId = lastSequenceId;
noMoreTsBlocks = true;
}
@@ -223,9 +225,14 @@ public class SourceHandle implements ISourceHandle {
@Override
public synchronized void close() {
+ logger.info("[SourceHandle {}-{}]: closed ", localFragmentInstanceId, localPlanNodeId);
if (closed) {
return;
}
+ if (blocked != null && !blocked.isDone()) {
+ blocked.cancel(true);
+ logger.info("[SourceHandle {}]: blocked is cancelled.", localPlanNodeId);
+ }
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
@@ -307,13 +314,12 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
- logger.debug(
- "Get data blocks [{}, {}) from {} for plan node {} of {}.",
+ logger.info(
+ "[SourceHandle-{}]: Get data blocks [{}, {}) from {}",
+ localPlanNodeId,
startSequenceId,
endSequenceId,
- remoteFragmentInstanceId,
- localPlanNodeId,
- localFragmentInstanceId);
+ remoteFragmentInstanceId);
GetDataBlockRequest req =
new GetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId);
int attempt = 0;
@@ -335,6 +341,7 @@ public class SourceHandle implements ISourceHandle {
}
if (!blocked.isDone()) {
blocked.set(null);
+ logger.info("[SourceHandle {}]: blocked is set null.", localPlanNodeId);
}
}
executorService.submit(
@@ -375,8 +382,9 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
- logger.debug(
- "Send ack data block event [{}, {}) to {}.",
+ logger.info(
+ "[SourceHandle {}]: Send ack data block event [{}, {}) to {}.",
+ localPlanNodeId,
startSequenceId,
endSequenceId,
remoteFragmentInstanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 0d45e76390..3d7a292e69 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -43,8 +45,10 @@ import java.util.concurrent.ScheduledExecutorService;
* QueryExecution.
*/
public class Coordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
- private static final int COORDINATOR_EXECUTOR_SIZE = 2;
+ private static final int COORDINATOR_EXECUTOR_SIZE = 20;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
@@ -94,7 +98,7 @@ public class Coordinator {
partitionFetcher,
schemaFetcher);
queryExecutionMap.put(queryId, execution);
-
+ LOG.info("[Query: {}] start QueryExecution. Statement: {}", queryId, sql);
execution.start();
return execution.getStatus();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 172a82db70..9dcb3e5fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -43,6 +43,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
@@ -61,6 +63,8 @@ import static com.google.common.base.Throwables.throwIfUnchecked;
* corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
*/
public class QueryExecution implements IQueryExecution {
+ private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
@@ -112,10 +116,12 @@ public class QueryExecution implements IQueryExecution {
public void start() {
doLogicalPlan();
doDistributedPlan();
+ LOG.info("[{}]: Distribution Plan: {}", this, distributedPlan);
if (context.getQueryType() == QueryType.READ) {
// The ResultHandle could only be initialized after distributed planning
initResultHandle();
}
+ LOG.info("[{}]: Start to schedule.", this);
schedule();
}
@@ -163,7 +169,12 @@ public class QueryExecution implements IQueryExecution {
}
/** Release the resources that current QueryExecution hold. */
- private void releaseResource() {}
+ private void releaseResource() {
+ // close ResultHandle to unblock client's getResult request
+ if (resultHandle != null && !resultHandle.isClosed()) {
+ resultHandle.close();
+ }
+ }
/**
* This method will be called by the request thread from client connection. This method will block
@@ -175,9 +186,12 @@ public class QueryExecution implements IQueryExecution {
@Override
public TsBlock getBatchResult() {
try {
+ LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId());
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
+ LOG.info("[QueryExecution {}]: unblock. Cancelled: {}, Done: {}", context.getQueryId(), blocked.isCancelled(), blocked.isDone());
if (resultHandle.isFinished()) {
+ LOG.info("[QueryExecution {}]: result is null", context.getQueryId());
return null;
}
return resultHandle.receive();
@@ -271,4 +285,8 @@ public class QueryExecution implements IQueryExecution {
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
}
+
+ public String toString() {
+ return String.format("QueryExecution[%s]", context.getQueryId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 8b819d6cad..47c57c873d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -43,6 +45,8 @@ import java.util.concurrent.ScheduledExecutorService;
* this scheduler.
*/
public class ClusterScheduler implements IScheduler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
private MPPQueryContext queryContext;
// The stateMachine of the QueryExecution owned by this QueryScheduler
private QueryStateMachine stateMachine;
@@ -79,13 +83,16 @@ public class ClusterScheduler implements IScheduler {
@Override
public void start() {
+ LOGGER.info("[{}] start to dispatch fragment instance. size: {}", queryContext.getQueryId(), instances.size());
stateMachine.transitionToDispatching();
Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect.
// So we need to start the state fetcher after the dispatching stage.
try {
+ LOGGER.info("[{}] wait dispatch to be finished", queryContext.getQueryId());
FragInstanceDispatchResult result = dispatchResultFuture.get();
+ LOGGER.info("[{}] dispatch finished: ", result.isSuccessful());
if (!result.isSuccessful()) {
stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched"));
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 286f3df3e6..b39ec04406 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
@@ -34,7 +36,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
private final ExecutorService executor;
public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
@@ -63,7 +65,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TSendFragmentInstanceReq req =
new TSendFragmentInstanceReq(
new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ LOGGER.info("start to dispatch fragment instance: {}", instance.getId());
resp = client.sendFragmentInstance(req);
+ LOGGER.info("dispatch complete: {}", instance.getId());
if (!resp.accepted) {
break;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9e9946c323..b11f643810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -107,6 +107,7 @@ public class SeriesScanOperator implements DataSourceOperator {
return true;
}
}
+ System.out.println(String.format("[SeriesScanOperator-%s]: hasNext returned: %s", sourceId, hasCachedTsBlock));
return hasCachedTsBlock;
} catch (IOException e) {
throw new RuntimeException("Error happened while scanning the file", e);
@@ -115,7 +116,7 @@ public class SeriesScanOperator implements DataSourceOperator {
@Override
public boolean isFinished() {
- return finished || (finished = hasNext());
+ return finished || (finished = !hasNext());
}
private boolean readChunkData() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..875894f47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -66,7 +66,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
private static final int MAX_CAPACITY = 1000; // TODO: load from config files
private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
- private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
+ private static final int QUERY_TIMEOUT_MS = 10_000; // TODO: load from config files or requests
private final ThreadGroup workerGroups;
private final List<AbstractExecutor> threads;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 2df53235ad..d9a64788f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -54,4 +54,15 @@ public class DistributedQueryPlan {
public List<FragmentInstance> getInstances() {
return instances;
}
+
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+ ret.append("Instance Count: ").append(instances.size());
+ ret.append(System.lineSeparator());
+ for (FragmentInstance instance : instances) {
+ ret.append(instance);
+ ret.append(System.lineSeparator());
+ }
+ return ret.toString();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 506f180049..aafc3ffd00 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
import org.apache.iotdb.db.mpp.execution.IQueryExecution;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
@@ -31,6 +32,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -42,7 +45,7 @@ import java.util.List;
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
public class QueryDataSetUtils {
-
+ private static final Logger LOG = LoggerFactory.getLogger(QueryDataSetUtils.class);
private static final int FLAG = 0x01;
private QueryDataSetUtils() {}
@@ -193,7 +196,9 @@ public class QueryDataSetUtils {
int rowCount = 0;
int[] valueOccupation = new int[columnNum];
while (rowCount < fetchSize && queryExecution.hasNextResult()) {
+ LOG.info("[ToTSDataSet {}] hasNext return true.", queryExecution);
TsBlock tsBlock = queryExecution.getBatchResult();
+ LOG.info("[ToTSDataSet {}] result got.", queryExecution);
if (tsBlock == null) {
break;
}