You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/01/27 16:09:51 UTC
[2/3] asterixdb git commit: [ASTERIXDB-2110] Introduce Cluster
Controller Id
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 360975d..1ec7485 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,7 +25,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -37,14 +36,12 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplication;
-import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.ICCContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -147,6 +144,8 @@ public class ClusterControllerService implements IControllerService {
private ShutdownRun shutdownCallback;
+ private final CcId ccId;
+
static {
ExitUtil.init();
}
@@ -182,7 +181,8 @@ public class ClusterControllerService implements IControllerService {
// Node manager is in charge of cluster membership management.
nodeManager = new NodeManager(this, ccConfig, resourceManager);
- jobIdFactory = new JobIdFactory();
+ ccId = ccConfig.getCcId();
+ jobIdFactory = new JobIdFactory(ccId);
deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
}
@@ -252,17 +252,18 @@ public class ClusterControllerService implements IControllerService {
}
}
- private Pair<String, Integer> getNCService(String nodeId) {
+ private InetSocketAddress getNCService(String nodeId) {
IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId);
- return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
- ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT));
+ final int port = ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT);
+ return port == NCConfig.NCSERVICE_PORT_DISABLED ? null
+ : InetSocketAddress.createUnresolved(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), port);
}
- private Map<String, Pair<String, Integer>> getNCServices() {
- Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+ private Map<String, InetSocketAddress> getNCServices() {
+ Map<String, InetSocketAddress> ncMap = new TreeMap<>();
for (String ncId : configManager.getNodeNames()) {
- Pair<String, Integer> ncService = getNCService(ncId);
- if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+ InetSocketAddress ncService = getNCService(ncId);
+ if (ncService != null) {
ncMap.put(ncId, ncService);
}
}
@@ -271,31 +272,19 @@ public class ClusterControllerService implements IControllerService {
private void connectNCs() {
getNCServices().forEach((key, value) -> {
- final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getLeft(),
- value.getRight(), key);
+ final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getHostString(),
+ value.getPort(), key);
executor.submit(triggerWork);
});
- serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
- @Override
- public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
- // no-op, we don't care
- LOGGER.log(Level.WARN, "Getting notified that node: " + nodeId + " has joined. and we don't care");
- }
-
- @Override
- public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
- LOGGER.log(Level.WARN, "Getting notified that nodes: " + deadNodeIds + " has failed");
- }
- });
}
public boolean startNC(String nodeId) {
- Pair<String, Integer> ncServiceAddress = getNCService(nodeId);
+ InetSocketAddress ncServiceAddress = getNCService(nodeId);
if (ncServiceAddress == null) {
return false;
}
- final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getLeft(),
- ncServiceAddress.getRight(), nodeId);
+ final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getHostString(),
+ ncServiceAddress.getPort(), nodeId);
executor.submit(startNc);
return true;
@@ -304,11 +293,9 @@ public class ClusterControllerService implements IControllerService {
private void terminateNCServices() throws Exception {
List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
getNCServices().forEach((key, value) -> {
- if (value.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
- ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getLeft(), value.getRight(), key);
- workQueue.schedule(shutdownWork);
- shutdownNCServiceWorks.add(shutdownWork);
- }
+ ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getHostString(), value.getPort(), key);
+ workQueue.schedule(shutdownWork);
+ shutdownNCServiceWorks.add(shutdownWork);
});
for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
shutdownWork.sync();
@@ -428,6 +415,10 @@ public class ClusterControllerService implements IControllerService {
return deployedJobSpecIdFactory;
}
+ public CcId getCcId() {
+ return ccId;
+ }
+
private final class ClusterControllerContext implements ICCContext {
private final ClusterTopology topology;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 367a1d5..742e2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -105,7 +105,7 @@ public class NodeManager implements INodeManager {
try {
// TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
- ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+ ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
} catch (IPCException e) {
throw HyracksDataException.create(e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 3a38287..04a34af 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -57,7 +57,7 @@ public class RegisterNodeWork extends SynchronizableWork {
try {
LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
NodeControllerRemoteProxy nc =
- new NodeControllerRemoteProxy(
+ new NodeControllerRemoteProxy(ccs.getCcId(),
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
@@ -73,7 +73,6 @@ public class RegisterNodeWork extends SynchronizableWork {
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
- ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Node registration failed", e);
result = new CCNCFunctions.NodeRegistrationResult(null, e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index f1d9a4d..53998aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -63,17 +63,15 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
List<Exception> exceptions;
if (exceptionHistory == null) {
// couldn't be found
- long maxJobId = ccs.getJobIdFactory().maxJobId();
- exceptions = Collections.singletonList(jobId.getId() <= maxJobId
- ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)
- : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId));
+ exceptions = Collections
+ .singletonList(HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId));
} else {
exceptions = exceptionHistory;
}
ccs.getExecutor().execute(() -> {
if (!exceptions.isEmpty()) {
- /**
+ /*
* only report the first exception because IResultCallback will only throw one exception
* anyway
*/
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 6fd321e..2307185 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.base;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -35,42 +36,44 @@ import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
public interface IClusterController {
- public void registerNode(NodeRegistration reg) throws Exception;
+ void registerNode(NodeRegistration reg) throws Exception;
- public void unregisterNode(String nodeId) throws Exception;
+ void unregisterNode(String nodeId) throws Exception;
- public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+ void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
throws Exception;
- public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
+ void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
- public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+ void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
- public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
+ void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
- public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+ void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
- public void notifyShutdown(String nodeId) throws Exception;
+ void notifyShutdown(String nodeId) throws Exception;
- public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+ void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
- public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
+ void reportProfile(String id, List<JobProfile> profiles) throws Exception;
- public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
+ void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
- public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+ void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+ void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
- public void getNodeControllerInfos() throws Exception;
+ void getNodeControllerInfos() throws Exception;
- public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+ void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+
+ CcId getCcId();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 5d781cf..ef3b27c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -36,30 +36,30 @@ import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
- public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+ void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
throws Exception;
- public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
+ void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
- public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
+ void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
- public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
+ void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
- public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+ void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
- public void undeployBinary(DeploymentId deploymentId) throws Exception;
+ void undeployBinary(DeploymentId deploymentId) throws Exception;
- public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
+ void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
- public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+ void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
- public void dumpState(String stateDumpId) throws Exception;
+ void dumpState(String stateDumpId) throws Exception;
- public void shutdown(boolean terminateNCService) throws Exception;
+ void shutdown(boolean terminateNCService) throws Exception;
- public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void takeThreadDump(String requestId) throws Exception;
+ void takeThreadDump(String requestId) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 62e6ee0..42ed1e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -81,6 +81,27 @@ public class OptionTypes {
}
};
+ public static final IOptionType<Short> SHORT = new IOptionType<Short>() {
+ @Override
+ public Short parse(String s) {
+ int value = Integer.decode(s);
+ if (Integer.highestOneBit(value) > 16) {
+ throw new IllegalArgumentException("The given value " + s + " is too big for a short");
+ }
+ return (short)value;
+ }
+
+ @Override
+ public Class<Short> targetType() {
+ return Short.class;
+ }
+
+ @Override
+ public void serializeJSONField(String fieldName, Object value, ObjectNode node) {
+ node.put(fieldName, (short)value);
+ }
+ };
+
public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() {
@Override
public Integer parse(String s) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 470e87c..85731b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers;
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import java.io.File;
@@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.util.file.FileUtil;
import org.ini4j.Ini;
@@ -67,7 +69,8 @@ public class CCConfig extends ControllerConfig {
JOB_QUEUE_CAPACITY(INTEGER, 4096),
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
- CORES_MULTIPLIER(INTEGER, 3);
+ CORES_MULTIPLIER(INTEGER, 3),
+ CONTROLLER_ID(SHORT, (short)0x0000);
private final IOptionType parser;
private Object defaultValue;
@@ -164,6 +167,8 @@ public class CCConfig extends ControllerConfig {
+ "bad behaving operators";
case CORES_MULTIPLIER:
return "Specifies the multiplier to use on the cluster available cores";
+ case CONTROLLER_ID:
+ return "The 16-bit (0-65535) id of this Cluster Controller";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -374,4 +379,8 @@ public class CCConfig extends ControllerConfig {
public int getCoresMultiplier() {
return getAppConfig().getInt(Option.CORES_MULTIPLIER);
}
+
+ public CcId getCcId() {
+ return CcId.valueOf(getAppConfig().getShort(Option.CONTROLLER_ID));
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 0a5ba30..95c063f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
@@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.util.file.FileUtil;
@@ -48,6 +50,7 @@ public class NCConfig extends ControllerConfig {
NCSERVICE_PORT(INTEGER, 9090),
CLUSTER_ADDRESS(STRING, (String) null),
CLUSTER_PORT(INTEGER, 1099),
+ CLUSTER_CONTROLLER_ID(SHORT, (short)0x0000),
CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
NODE_ID(STRING, (String) null),
@@ -141,6 +144,8 @@ public class NCConfig extends ControllerConfig {
return "Cluster Controller port";
case CLUSTER_LISTEN_PORT:
return "IP port to bind cluster listener";
+ case CLUSTER_CONTROLLER_ID:
+ return "16-bit (0-65535) id of the Cluster Controller";
case CLUSTER_PUBLIC_ADDRESS:
return "Public IP Address to announce cluster listener";
case CLUSTER_PUBLIC_PORT:
@@ -308,6 +313,10 @@ public class NCConfig extends ControllerConfig {
configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
}
+ public CcId getClusterControllerId() {
+ return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
+ }
+
public String getClusterListenAddress() {
return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index dca8c07..5c6d078 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -38,6 +38,7 @@ import java.util.Set;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -150,12 +151,25 @@ public class CCNCFunctions {
}
- public static abstract class Function implements Serializable {
+ public abstract static class Function implements Serializable {
private static final long serialVersionUID = 1L;
public abstract FunctionId getFunctionId();
}
+ public abstract static class CCIdentifiedFunction extends Function {
+ private static final long serialVersionUID = 1L;
+ private final CcId ccId;
+
+ protected CCIdentifiedFunction(CcId ccId) {
+ this.ccId = ccId;
+ }
+
+ public CcId getCcId() {
+ return ccId;
+ }
+ }
+
public static class RegisterNodeFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -668,24 +682,33 @@ public class CCNCFunctions {
}
}
- //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
public static class AbortCCJobsFunction extends Function {
private static final long serialVersionUID = 1L;
+ private final CcId ccId;
+
+ public AbortCCJobsFunction(CcId ccId) {
+ this.ccId = ccId;
+ }
@Override
public FunctionId getFunctionId() {
return FunctionId.ABORT_ALL_JOBS;
}
+
+ public CcId getCcId() {
+ return ccId;
+ }
}
- public static class DeployJobSpecFunction extends Function {
+ public static class DeployJobSpecFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeployedJobSpecId deployedJobSpecId;
private final byte[] acgBytes;
- public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
+ public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) {
+ super(ccId);
this.deployedJobSpecId = deployedJobSpecId;
this.acgBytes = acgBytes;
}
@@ -704,12 +727,13 @@ public class CCNCFunctions {
}
}
- public static class UndeployJobSpecFunction extends Function {
+ public static class UndeployJobSpecFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeployedJobSpecId deployedJobSpecId;
- public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+ public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, CcId ccId) {
+ super(ccId);
this.deployedJobSpecId = deployedJobSpecId;
}
@@ -724,7 +748,7 @@ public class CCNCFunctions {
}
public static class StartTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final DeploymentId deploymentId;
private final JobId jobId;
@@ -1008,11 +1032,12 @@ public class CCNCFunctions {
}
}
- public static class ThreadDumpRequestFunction extends Function {
+ public static class ThreadDumpRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final String requestId;
- public ThreadDumpRequestFunction(String requestId) {
+ public ThreadDumpRequestFunction(String requestId, CcId ccId) {
+ super(ccId);
this.requestId = requestId;
}
@@ -1106,13 +1131,14 @@ public class CCNCFunctions {
}
}
- public static class DeployBinaryFunction extends Function {
+ public static class DeployBinaryFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
- public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) {
+ public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
+ super(ccId);
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
}
@@ -1131,12 +1157,13 @@ public class CCNCFunctions {
}
}
- public static class UnDeployBinaryFunction extends Function {
+ public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeploymentId deploymentId;
- public UnDeployBinaryFunction(DeploymentId deploymentId) {
+ public UnDeployBinaryFunction(DeploymentId deploymentId, CcId ccId) {
+ super(ccId);
this.deploymentId = deploymentId;
}
@@ -1211,12 +1238,13 @@ public class CCNCFunctions {
}
}
- public static class StateDumpRequestFunction extends Function {
+ public static class StateDumpRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final String stateDumpId;
- public StateDumpRequestFunction(String stateDumpId) {
+ public StateDumpRequestFunction(String stateDumpId, CcId ccId) {
+ super(ccId);
this.stateDumpId = stateDumpId;
}
@@ -1265,12 +1293,13 @@ public class CCNCFunctions {
}
}
- public static class ShutdownRequestFunction extends Function {
+ public static class ShutdownRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final boolean terminateNCService;
- public ShutdownRequestFunction(boolean terminateNCService) {
+ public ShutdownRequestFunction(boolean terminateNCService, CcId ccId) {
+ super(ccId);
this.terminateNCService = terminateNCService;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index f2e7d87..e4e2dbe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.ipc;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -56,9 +57,11 @@ import org.apache.hyracks.ipc.api.IIPCHandle;
public class ClusterControllerRemoteProxy implements IClusterController {
+ private final CcId ccId;
private IIPCHandle ipcHandle;
- public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+ this.ccId = ccId;
this.ipcHandle = ipcHandle;
}
@@ -181,4 +184,14 @@ public class ClusterControllerRemoteProxy implements IClusterController {
threadDumpJSON);
ipcHandle.send(-1, tdrf, null);
}
+
+ @Override
+ public CcId getCcId() {
+ return ccId;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b6b9b4b..a09a8bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -51,9 +52,11 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
import org.apache.hyracks.ipc.api.IIPCHandle;
public class NodeControllerRemoteProxy implements INodeController {
+ private final CcId ccId;
private final IIPCHandle ipcHandle;
- public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ public NodeControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+ this.ccId = ccId;
this.ipcHandle = ipcHandle;
}
@@ -88,37 +91,37 @@ public class NodeControllerRemoteProxy implements INodeController {
@Override
public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
- DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
+ DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
ipcHandle.send(-1, rpaf, null);
}
@Override
public void undeployBinary(DeploymentId deploymentId) throws Exception {
- UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+ UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId, ccId);
ipcHandle.send(-1, rpaf, null);
}
@Override
public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
- DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
+ DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId);
ipcHandle.send(-1, fn, null);
}
@Override
public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
- UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
+ UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId, ccId);
ipcHandle.send(-1, fn, null);
}
@Override
public void dumpState(String stateDumpId) throws Exception {
- StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
+ StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId, ccId);
ipcHandle.send(-1, dsf, null);
}
@Override
public void shutdown(boolean terminateNCService) throws Exception {
- ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
+ ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService, ccId);
ipcHandle.send(-1, sdrf, null);
}
@@ -131,7 +134,7 @@ public class NodeControllerRemoteProxy implements INodeController {
@Override
public void takeThreadDump(String requestId) throws Exception {
- ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
+ ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
ipcHandle.send(-1, fn, null);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 94e86dd..9670e42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -58,7 +59,7 @@ public class BaseNCApplication implements INCApplication {
}
@Override
- public void onRegisterNode() throws Exception {
+ public void onRegisterNode(CcId ccId) throws Exception {
// no-op
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8cb33ca..8790434 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -222,15 +222,10 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
public void close() {
long stillAllocated = memoryAllocation.get();
if (stillAllocated > 0) {
- LOGGER.warn("Freeing leaked " + stillAllocated + " bytes");
+ LOGGER.info(() -> "Freeing leaked " + stillAllocated + " bytes");
serviceCtx.getMemoryManager().deallocate(stillAllocated);
}
- nodeController.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- deallocatableRegistry.close();
- }
- });
+ nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
}
ByteBuffer allocateFrame() throws HyracksDataException {
@@ -298,7 +293,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
for (PartitionId pid : pids) {
partitionRequestMap.put(pid, collector);
PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
- nodeController.getClusterController().registerPartitionRequest(req);
+ nodeController.getClusterController(jobId.getCcId()).registerPartitionRequest(req);
}
}
@@ -326,7 +321,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
close();
cleanupPending = false;
try {
- nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+ nodeController.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, nodeController.getId());
} catch (Exception e) {
e.printStackTrace();
}
@@ -341,4 +336,5 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
public ClassLoader getClassLoader() throws HyracksException {
return DeploymentUtils.getClassLoader(deploymentId, serviceCtx);
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index b220039..f55e250 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -72,7 +72,8 @@ final class NodeControllerIPCI implements IIPCI {
ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
return;
case ABORT_ALL_JOBS:
- ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+ CCNCFunctions.AbortCCJobsFunction aajf = (CCNCFunctions.AbortCCJobsFunction) fn;
+ ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, aajf.getCcId()));
return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
@@ -97,27 +98,29 @@ final class NodeControllerIPCI implements IIPCI {
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
+ ncs.getWorkQueue()
+ .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
return;
case UNDEPLOY_BINARY:
CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
+ ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId()));
return;
case DISTRIBUTE_JOB:
CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
- ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes()));
+ ncs.getWorkQueue().schedule(
+ new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId()));
return;
case DESTROY_JOB:
CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
- ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId()));
+ ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId()));
return;
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
- ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
+ ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId(), dsrf.getCcId()));
return;
case SHUTDOWN_REQUEST:
@@ -127,7 +130,7 @@ final class NodeControllerIPCI implements IIPCI {
case THREAD_DUMP_REQUEST:
final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
- ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+ ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId()));
return;
default:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 18a6b20..24d72f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -29,12 +29,14 @@ import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -46,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -90,6 +93,7 @@ import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InvokeUtil;
import org.apache.hyracks.util.PidHelper;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.Tracer;
@@ -128,7 +132,9 @@ public class NodeControllerService implements IControllerService {
private Exception registrationException;
- private IClusterController ccs;
+ private IClusterController primaryCcs;
+
+ private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>());
private final Map<JobId, Joblet> jobletMap;
@@ -140,7 +146,9 @@ public class NodeControllerService implements IControllerService {
private NodeParameters nodeParameters;
- private Thread heartbeatThread;
+ private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+
+ private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
private final ServerContext serverCtx;
@@ -180,6 +188,8 @@ public class NodeControllerService implements IControllerService {
ExitUtil.init();
}
+ private NCShutdownHook ncShutdownHook;
+
public NodeControllerService(NCConfig config) throws Exception {
this(config, getApplication(config));
}
@@ -201,13 +211,14 @@ public class NodeControllerService implements IControllerService {
LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
}
// Set shutdown hook before so it doesn't have the same uncaught exception handler
- Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
+ ncShutdownHook = new NCShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(ncShutdownHook);
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
ioManager =
new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
- jobletMap = new Hashtable<>();
+ jobletMap = new ConcurrentHashMap<>();
deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -235,13 +246,6 @@ public class NodeControllerService implements IControllerService {
return lccm;
}
- synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
- this.nodeParameters = parameters;
- this.registrationException = exception;
- this.registrationPending = false;
- notifyAll();
- }
-
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
synchronized (getNodeControllerInfosAcceptor) {
@@ -250,7 +254,7 @@ public class NodeControllerService implements IControllerService {
}
getNodeControllerInfosAcceptor.setValue(fv);
}
- ccs.getNodeControllerInfos();
+ primaryCcs.getNodeControllerInfos();
return fv.get();
}
@@ -297,79 +301,142 @@ public class NodeControllerService implements IControllerService {
if (messagingNetManager != null) {
messagingNetManager.start();
}
- this.ccs = new ClusterControllerRemoteProxy(
- ipc.getHandle(
- new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
- ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() {
- @Override
- public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
- // we need to re-register in case of NC -> CC connection reset
- try {
- registerNode();
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failed Registering with cc", e);
- throw new IPCException(e);
- }
- }
- }));
- registerNode();
+
+ final InetSocketAddress ccAddress = new InetSocketAddress(ncConfig.getClusterAddress(),
+ ncConfig.getClusterPort());
+ this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
workQueue.start();
// Schedule tracing a human-readable datetime
timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
- if (nodeParameters.getProfileDumpPeriod() > 0) {
- // Schedule profile dump generator.
- timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ application.startupCompleted();
+ }
+
+ public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
+ ClusterControllerRemoteProxy ccProxy;
+ synchronized (ccsMap) {
+ if (ccsMap.containsKey(ccId)) {
+ throw new IllegalStateException("cc already registered: " + ccId);
+ }
+ final IIPCEventListener ipcEventListener = new IIPCEventListener() {
+ @Override
+ public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+ // we need to re-register in case of NC -> CC connection reset
+ try {
+ registerNode(ccsMap.get(ccId));
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+ throw new IPCException(e);
+ }
+ }
+ };
+ ccProxy = new ClusterControllerRemoteProxy(ccId,
+ ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
+ registerNode(ccProxy);
+ ccsMap.put(ccId, ccProxy);
}
+ return ccProxy;
+ }
- // Start heartbeat generator.
- heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
- heartbeatThread.setPriority(Thread.MAX_PRIORITY);
- heartbeatThread.setDaemon(true);
- heartbeatThread.start();
+ public void makePrimaryCc(CcId ccId) throws Exception {
+ synchronized (ccsMap) {
+ if (!ccsMap.containsKey(ccId)) {
+ throw new IllegalArgumentException("unknown cc: " + ccId);
+ }
+ primaryCcs = ccsMap.get(ccId);
+ }
+ }
- LOGGER.log(Level.INFO, "Started NodeControllerService");
- application.startupCompleted();
+ public void removeCc(CcId ccId) throws Exception {
+ synchronized (ccsMap) {
+ final IClusterController ccs = ccsMap.get(ccId);
+ if (ccs == null) {
+ throw new IllegalArgumentException("unknown cc: " + ccId);
+ }
+ if (primaryCcs.equals(ccs)) {
+ throw new IllegalStateException("cannot remove primary cc: " + ccId);
+ }
+ // TODO(mblow): consider how to handle running jobs
+ ccs.unregisterNode(id);
+ Thread hbThread = heartbeatThreads.remove(ccs);
+ hbThread.interrupt();
+ Timer ccTimer = ccTimers.remove(ccs);
+ if (ccTimer != null) {
+ ccTimer.cancel();
+ }
+ }
}
- public void registerNode() throws Exception {
- LOGGER.info("Registering with Cluster Controller");
+ protected void registerNode(IClusterController ccs) throws Exception {
+ LOGGER.info("Registering with Cluster Controller {}", ccs);
registrationPending = true;
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- // Use "public" versions of network addresses and ports
+ // Use "public" versions of network addresses and ports, if defined
+ InetSocketAddress ncAddress;
+ if (ncConfig.getClusterPublicPort() == 0) {
+ ncAddress = ipc.getSocketAddress();
+ } else {
+ ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+ }
NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- NetworkAddress meesagingPort =
+ NetworkAddress messagingAddress =
messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
int allCores = osMXBean.getAvailableProcessors();
- nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+ nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(),
+ runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
PidHelper.getPid(), maxJobId.get());
ccs.registerNode(nodeRegistration);
- synchronized (this) {
- while (registrationPending) {
- wait();
- }
+ completeNodeRegistration(ccs);
+
+ // Start heartbeat generator.
+ if (!heartbeatThreads.containsKey(ccs)) {
+ Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()),
+ id + "-Heartbeat");
+ heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+ heartbeatThread.setDaemon(true);
+ heartbeatThread.start();
+ heartbeatThreads.put(ccs, heartbeatThread);
+ }
+ if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) {
+ Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+ // Schedule profile dump generator.
+ ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ ccTimers.put(ccs, ccTimer);
+ }
+
+ LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+ }
+
+ synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
+ private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception {
+ while (registrationPending) {
+ wait();
}
if (registrationException != null) {
- LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception",
- registrationException);
+ LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException);
throw registrationException;
}
serviceCtx.setDistributedState(nodeParameters.getDistributedState());
- application.onRegisterNode();
- LOGGER.info("Registering with Cluster Controller complete");
+ application.onRegisterNode(ccs.getCcId());
}
private void startApplication() throws Exception {
@@ -404,17 +471,21 @@ public class NodeControllerService implements IControllerService {
workQueue.stop();
application.stop();
/*
- * Stop heartbeat after NC has stopped to avoid false node failure detection
+ * Stop heartbeats only after NC has stopped to avoid false node failure detection
* on CC if an NC takes a long time to stop.
*/
- if (heartbeatThread != null) {
- heartbeatThread.interrupt();
- heartbeatThread.join(1000); // give it 1s to stop gracefully
- }
- try {
- ccs.notifyShutdown(id);
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+ heartbeatThreads.values().parallelStream().forEach(t -> {
+ t.interrupt();
+ InvokeUtil.doUninterruptibly(() -> t.join(1000));
+ });
+ synchronized (ccsMap) {
+ ccsMap.values().parallelStream().forEach(ccs -> {
+ try {
+ ccs.notifyShutdown(id);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+ }
+ });
}
ipc.stop();
@@ -423,6 +494,14 @@ public class NodeControllerService implements IControllerService {
LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
new Exception("Duplicate shutdown call"));
}
+ if (ncShutdownHook != null) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(ncShutdownHook);
+ LOGGER.info("removed shutdown hook for {}", id);
+ } catch (IllegalStateException e) {
+ LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e);
+ }
+ }
}
public String getId() {
@@ -488,8 +567,12 @@ public class NodeControllerService implements IControllerService {
return partitionManager;
}
- public IClusterController getClusterController() {
- return ccs;
+ public IClusterController getPrimaryClusterController() {
+ return primaryCcs;
+ }
+
+ public IClusterController getClusterController(CcId ccId) {
+ return ccsMap.get(ccId);
}
public NodeParameters getNodeParameters() {
@@ -619,7 +702,7 @@ public class NodeControllerService implements IControllerService {
public void run() {
try {
FutureValue<List<JobProfile>> fv = new FutureValue<>();
- BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
workQueue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
@@ -651,8 +734,8 @@ public class NodeControllerService implements IControllerService {
}
}
- public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
- ccs.sendApplicationMessageToCC(data, deploymentId, id);
+ public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+ ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}
public IDatasetPartitionManager getDatasetPartitionManager() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 34ddd6a..07bb504 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -437,12 +437,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
@Override
public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
- this.ncs.sendApplicationMessageToCC(message, deploymentId);
+ this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(), message, deploymentId);
}
@Override
public void sendApplicationMessageToCC(Serializable message, DeploymentId deploymentId) throws Exception {
- this.ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), deploymentId);
+ this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(),
+ JavaSerializationUtils.serialize(message), deploymentId);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 476aeae..fb7308e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -93,10 +93,10 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
boolean orderedResult, boolean emptyResult) throws HyracksException {
try {
// Be sure to send the *public* network address to the CC
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
- partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
+ ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
+ emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
- throw new HyracksException(e);
+ throw HyracksException.create(e);
}
}
@@ -105,9 +105,9 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
try {
LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+ ":partition: " + partition);
- ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (Exception e) {
- throw new HyracksException(e);
+ throw HyracksException.create(e);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index b9d2f4d..4787a50 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -96,7 +96,7 @@ public class MaterializedPartitionWriter implements IFrameWriter {
ctx.getIoManager().close(handle);
}
if (!failed) {
- manager.registerPartition(pid, taId,
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId,
new MaterializedPartition(ctx, fRef, executor, ctx.getIoManager()),
PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 57eba53..147606d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -188,7 +188,8 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
eos = false;
failed = false;
deallocated = false;
- manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+ false);
}
private void checkOrCreateFile() throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 667cfa3..bb69eec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -58,10 +59,10 @@ public class PartitionManager {
this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
}
- public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition,
+ public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
PartitionState state, boolean updateToCC) throws HyracksDataException {
try {
- /**
+ /*
* process pending requests
*/
NetworkOutputChannel writer = partitionRequests.remove(pid);
@@ -73,24 +74,20 @@ public class PartitionManager {
}
}
- /**
+ /*
* put a coming available partition into the available partition map
*/
- List<IPartition> pList = availablePartitionMap.get(pid);
- if (pList == null) {
- pList = new ArrayList<>();
- availablePartitionMap.put(pid, pList);
- }
+ List<IPartition> pList = availablePartitionMap.computeIfAbsent(pid, k -> new ArrayList<>());
pList.add(partition);
- /**
+ /*
* update to CC only when necessary
*/
if (updateToCC) {
- updatePartitionState(pid, taId, partition, state);
+ updatePartitionState(ccId, pid, taId, partition, state);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -128,7 +125,7 @@ public class PartitionManager {
partitionRequests.put(partitionId, writer);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -140,14 +137,15 @@ public class PartitionManager {
deallocatableRegistry.close();
}
- public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+ PartitionState state)
throws HyracksDataException {
PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
desc.setState(state);
try {
- ncs.getClusterController().registerPartitionProvider(desc);
+ ncs.getClusterController(ccId).registerPartitionProvider(desc);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 16e5027..fc2f8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -71,7 +71,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
@Override
public void open() throws HyracksDataException {
- manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+ false);
pendingConnection = true;
ensureConnected();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index f43dcbc..4969a85 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -18,8 +18,9 @@
*/
package org.apache.hyracks.control.nc.task;
-import org.apache.hyracks.util.ThreadDumpUtil;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -28,10 +29,12 @@ public class ThreadDumpTask implements Runnable {
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
private final String requestId;
+ private final CcId ccId;
- public ThreadDumpTask(NodeControllerService ncs, String requestId) {
+ public ThreadDumpTask(NodeControllerService ncs, String requestId, CcId ccId) {
this.ncs = ncs;
this.requestId = requestId;
+ this.ccId = ccId;
}
@Override
@@ -44,8 +47,7 @@ public class ThreadDumpTask implements Runnable {
result = null;
}
try {
- ncs.getClusterController().notifyThreadDump(
- ncs.getContext().getNodeId(), requestId, result);
+ ncs.getClusterController(ccId).notifyThreadDump(ncs.getContext().getNodeId(), requestId, result);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Exception sending thread dump to CC", e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 6132639..68d677f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.work;
import java.util.Collection;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -34,24 +35,29 @@ public class AbortAllJobsWork extends SynchronizableWork {
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
+ private final CcId ccId;
- public AbortAllJobsWork(NodeControllerService ncs) {
+ public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) {
this.ncs = ncs;
+ this.ccId = ccId;
}
@Override
protected void doRun() throws Exception {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Aborting all tasks");
- }
+ LOGGER.info("Aborting all tasks for controller {}", ccId);
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
- if (dpm != null) {
- ncs.getDatasetPartitionManager().abortAllReaders();
- } else {
+ if (dpm == null) {
LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
}
Collection<Joblet> joblets = ncs.getJobletMap().values();
for (Joblet ji : joblets) {
+ // TODO(mblow): should we have one jobletmap per cc?
+ if (!ji.getJobId().getCcId().equals(ccId)) {
+ continue;
+ }
+ if (dpm != null) {
+ dpm.abortReader(ji.getJobId());
+ }
Collection<Task> tasks = ji.getTaskMap().values();
for (Task task : tasks) {
task.abort();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
index 582f058..0dd5d4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -33,20 +34,21 @@ import org.apache.hyracks.control.nc.NodeControllerService;
public class BuildJobProfilesWork extends SynchronizableWork {
private final NodeControllerService ncs;
+ private final CcId ccId;
private final FutureValue<List<JobProfile>> fv;
- public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+ public BuildJobProfilesWork(NodeControllerService ncs, CcId ccId, FutureValue<List<JobProfile>> fv) {
this.ncs = ncs;
+ this.ccId = ccId;
this.fv = fv;
}
@Override
protected void doRun() throws Exception {
- List<JobProfile> profiles = new ArrayList<JobProfile>();
+ List<JobProfile> profiles = new ArrayList<>();
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- for (Joblet ji : jobletMap.values()) {
- profiles.add(new JobProfile(ji.getJobId()));
- }
+ jobletMap.values().stream().filter(ji -> ji.getJobId().getCcId().equals(ccId))
+ .forEach(ji -> profiles.add(new JobProfile(ji.getJobId())));
for (JobProfile jProfile : profiles) {
Joblet ji;
JobletProfile jobletProfile = new JobletProfile(ncs.getId());