You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2017/11/15 10:32:07 UTC
asterixdb git commit: [NO ISSUE][TX] Ensure TxnIdFactory Value is
Initialized
Repository: asterixdb
Updated Branches:
refs/heads/master 8a7894f58 -> 893d385f7
[NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Report local max txn id after node registration.
- Add node status BOOTING.
- Distinguish between node first time registration and
registration after restarting by using NodeStatus
BOOTING to respond with the proper node post
registration tasks.
- Rename node status ALIVE -> ACTIVE.
- Rename StartupTask* to RegistrationTasks*
Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2151
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/893d385f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/893d385f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/893d385f
Branch: refs/heads/master
Commit: 893d385f7132d002fb3219d1e55e03836920d61a
Parents: 8a7894f
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Nov 15 02:35:03 2017 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Wed Nov 15 02:31:21 2017 -0800
----------------------------------------------------------------------
.../asterix/app/nc/NCAppRuntimeContext.java | 4 +-
.../app/nc/task/ReportLocalCountersTask.java | 40 +++++++++
.../app/nc/task/ReportMaxResourceIdTask.java | 40 ---------
.../replication/AutoFaultToleranceStrategy.java | 20 ++---
.../MetadataNodeFaultToleranceStrategy.java | 20 ++---
.../replication/NoFaultToleranceStrategy.java | 46 +++++++---
.../message/NCLifecycleTaskReportMessage.java | 2 +-
.../RegistrationTasksRequestMessage.java | 81 +++++++++++++++++
.../RegistrationTasksResponseMessage.java | 93 ++++++++++++++++++++
.../message/StartupTaskRequestMessage.java | 71 ---------------
.../message/StartupTaskResponseMessage.java | 93 --------------------
.../hyracks/bootstrap/NCApplication.java | 30 ++++---
.../common/replication/INCLifecycleMessage.java | 8 +-
.../message/ReportLocalCountersMessage.java | 74 ++++++++++++++++
.../ReportLocalCountersRequestMessage.java | 38 ++++++++
.../message/ReportMaxResourceIdMessage.java | 72 ---------------
.../ReportMaxResourceIdRequestMessage.java | 38 --------
.../message/ResourceIdRequestMessage.java | 2 +-
.../service/transaction/TxnIdFactory.java | 2 +-
.../apache/hyracks/api/client/NodeStatus.java | 3 +-
.../hyracks/control/cc/cluster/NodeManager.java | 2 +-
.../hyracks/hdfs/scheduler/SchedulerTest.java | 4 +-
.../apache/hyracks/test/support/TestUtils.java | 2 +-
23 files changed, 413 insertions(+), 372 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7b08f68..e77d535 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -481,7 +481,9 @@ public class NCAppRuntimeContext implements INcApplicationContext {
@Override
public synchronized void unexportMetadataNodeStub() throws RemoteException {
- UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+ if (metadataNodeStub != null) {
+ UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+ }
metadataNodeStub = null;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
new file mode 100644
index 0000000..86f7d1c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportLocalCountersTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ ReportLocalCountersMessage.send((NodeControllerService) cs);
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
deleted file mode 100644
index 22d3cde..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportMaxResourceIdTask implements INCLifecycleTask {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void perform(IControllerService cs) throws HyracksDataException {
- ReportMaxResourceIdMessage.send((NodeControllerService) cs);
- }
-
- @Override
- public String toString() {
- return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 4ac1305..23f225e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -33,7 +33,7 @@ import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartFailbackTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
@@ -43,8 +43,8 @@ import org.apache.asterix.app.replication.message.CompleteFailbackResponseMessag
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
@@ -431,10 +431,10 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
@Override
public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
case TAKEOVER_PARTITION_RESPONSE:
@@ -483,7 +483,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
}
- private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
final SystemState state = msg.getState();
List<INCLifecycleTask> tasks;
@@ -493,7 +493,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
// failed node returned. Need to start failback process
tasks = buildFailbackStartupSequence();
}
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -504,7 +504,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
private List<INCLifecycleTask> buildFailbackStartupSequence() {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new StartFailbackTask());
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new StartLifecycleComponentsTask());
return tasks;
}
@@ -517,7 +517,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 1b57403..3341813 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -35,14 +35,14 @@ import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -123,10 +123,10 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
@Override
public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
case REPLAY_LOGS_RESPONSE:
@@ -150,7 +150,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
}
}
- private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
final SystemState state = msg.getState();
final boolean isParticipant = replicationStrategy.isParticipant(nodeId);
@@ -160,7 +160,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
} else {
tasks = buildParticipantStartupSequence(nodeId, state);
}
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -199,7 +199,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
tasks.add(rt);
}
tasks.add(new ExternalLibrarySetupTask(false));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
return tasks;
@@ -234,7 +234,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index b9ea135..a273845 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -32,11 +32,11 @@ import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -48,12 +48,13 @@ import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName());
- IClusterStateManager clusterManager;
+ private IClusterStateManager clusterManager;
private String metadataNodeId;
private Set<String> pendingStartupCompletionNodes = new HashSet<>();
private ICCMessageBroker messageBroker;
@@ -76,10 +77,10 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
@Override
public void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
default:
@@ -100,10 +101,10 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
metadataNodeId = clusterManager.getCurrentMetadataNodeId();
}
- private void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
- List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState());
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -126,7 +127,16 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
}
}
- private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) {
+ private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state);
+ final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+ if (nodeStatus == NodeStatus.ACTIVE) {
+ /*
+ * if the node state is already ACTIVE then it completed
+ * booting and just re-registering with a new/failed CC.
+ */
+ return buildActiveNCRegTasks(isMetadataNode);
+ }
final List<INCLifecycleTask> tasks = new ArrayList<>();
if (state == SystemState.CORRUPTED) {
//need to perform local recovery for node partitions
@@ -134,12 +144,11 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
tasks.add(rt);
}
- final boolean isMetadataNode = nodeId.equals(metadataNodeId);
if (isMetadataNode) {
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
@@ -147,4 +156,15 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
}
return tasks;
}
+
+ private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ if (metadataNode) {
+ // need to unbind from old distributed state then rebind to new one
+ tasks.add(new BindMetadataNodeTask(false));
+ tasks.add(new BindMetadataNodeTask(true));
+ }
+ tasks.add(new ReportLocalCountersTask());
+ return tasks;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 2b32e1f..b654fd8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -58,6 +58,6 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd
@Override
public MessageType getType() {
- return MessageType.STARTUP_TASK_RESULT;
+ return MessageType.REGISTRATION_TASKS_RESULT;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
new file mode 100644
index 0000000..075c415
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(RegistrationTasksRequestMessage.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final SystemState state;
+ private final String nodeId;
+ private final NodeStatus nodeStatus;
+
+ public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ this.state = state;
+ this.nodeId = nodeId;
+ this.nodeStatus = nodeStatus;
+ }
+
+ public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
+ throws HyracksDataException {
+ try {
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState);
+ ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Unable to send RegistrationTasksRequestMessage to CC", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ appCtx.getFaultToleranceStrategy().process(this);
+ }
+
+ public SystemState getState() {
+ return state;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public NodeStatus getNodeStatus() {
+ return nodeStatus;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.REGISTRATION_TASKS_REQUEST;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
new file mode 100644
index 0000000..13525e3
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NCShutdownHook;
+import org.apache.hyracks.util.ExitUtil;
+
+public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(RegistrationTasksResponseMessage.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final List<INCLifecycleTask> tasks;
+
+ public RegistrationTasksResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
+ this.nodeId = nodeId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ IControllerService cs = appCtx.getServiceContext().getControllerService();
+ boolean success = true;
+ try {
+ Throwable exception = null;
+ try {
+ for (INCLifecycleTask task : tasks) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Starting startup task: " + task);
+ }
+ task.perform(cs);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Completed startup task: " + task);
+ }
+ }
+ } catch (Throwable e) { //NOSONAR all startup failures should be reported to CC
+ LOGGER.log(Level.SEVERE, "Failed during startup task", e);
+ success = false;
+ exception = e;
+ }
+ NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
+ result.setException(exception);
+ try {
+ broker.sendMessageToCC(result);
+ } catch (Exception e) {
+ success = false;
+ LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+ }
+ } finally {
+ if (!success) {
+ // stop NC so that it can be started again
+ ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
+ }
+ }
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.REGISTRATION_TASKS_RESPONSE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
deleted file mode 100644
index 21dee9c..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
-
- private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
- private static final long serialVersionUID = 1L;
- private final SystemState state;
- private final String nodeId;
-
- public StartupTaskRequestMessage(String nodeId, SystemState state) {
- this.state = state;
- this.nodeId = nodeId;
- }
-
- public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException {
- try {
- StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
- ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- appCtx.getFaultToleranceStrategy().process(this);
- }
-
- public SystemState getState() {
- return state;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public MessageType getType() {
- return MessageType.STARTUP_TASK_REQUEST;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
deleted file mode 100644
index b941343..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NCShutdownHook;
-import org.apache.hyracks.util.ExitUtil;
-
-public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
-
- private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName());
- private static final long serialVersionUID = 1L;
- private final String nodeId;
- private final List<INCLifecycleTask> tasks;
-
- public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
- this.nodeId = nodeId;
- this.tasks = tasks;
- }
-
- @Override
- public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- IControllerService cs = appCtx.getServiceContext().getControllerService();
- boolean success = true;
- try {
- Throwable exception = null;
- try {
- for (INCLifecycleTask task : tasks) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Starting startup task: " + task);
- }
- task.perform(cs);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Completed startup task: " + task);
- }
- }
- } catch (Throwable e) { //NOSONAR all startup failures should be reported to CC
- LOGGER.log(Level.SEVERE, "Failed during startup task", e);
- success = false;
- exception = e;
- }
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
- result.setException(exception);
- try {
- broker.sendMessageToCC(result);
- } catch (Exception e) {
- success = false;
- LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
- }
- } finally {
- if (!success) {
- // stop NC so that it can be started again
- ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
- }
- }
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public MessageType getType() {
- return MessageType.STARTUP_TASK_RESPONSE;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 3d7f870..a18535d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -26,7 +26,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.AsterixExtension;
@@ -48,6 +48,7 @@ import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -67,7 +68,6 @@ public class NCApplication extends BaseNCApplication {
private String nodeId;
private boolean stopInitiated;
private boolean startupCompleted;
- private SystemState systemState;
protected WebManager webManager;
@Override
@@ -117,9 +117,8 @@ public class NCApplication extends BaseNCApplication {
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- systemState = recoveryMgr.getSystemState();
-
- if (systemState == SystemState.PERMANENT_DATA_LOSS) {
+ final SystemState stateOnStartup = recoveryMgr.getSystemState();
+ if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS);
LOGGER.info("Node ID: " + nodeId);
@@ -187,20 +186,27 @@ public class NCApplication extends BaseNCApplication {
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
- if (systemState == SystemState.PERMANENT_DATA_LOSS
- && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
- systemState = SystemState.BOOTSTRAPPING;
+ IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+ SystemState state = recoveryMgr.getSystemState();
+ if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties
+ .isVirtualNc())) {
+ state = SystemState.BOOTSTRAPPING;
}
- // Request startup tasks from CC
- StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+ // Request registration tasks from CC
+ RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+ NodeStatus.BOOTING, state);
startupCompleted = true;
}
@Override
public void onRegisterNode() throws Exception {
if (startupCompleted) {
- // Request startup tasks from CC
- StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+ /*
+ * If the node completed its startup before, then this is a re-registration with
+ * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
+ */
+ RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+ NodeStatus.ACTIVE, SystemState.HEALTHY);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
index 87b0856..cb9fa8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -22,16 +22,16 @@ import org.apache.hyracks.api.messages.IMessage;
public interface INCLifecycleMessage extends IMessage {
- public enum MessageType {
+ enum MessageType {
REPLAY_LOGS_REQUEST,
REPLAY_LOGS_RESPONSE,
PREPARE_FAILBACK_REQUEST,
PREPARE_FAILBACK_RESPONSE,
COMPLETE_FAILBACK_REQUEST,
COMPLETE_FAILBACK_RESPONSE,
- STARTUP_TASK_REQUEST,
- STARTUP_TASK_RESPONSE,
- STARTUP_TASK_RESULT,
+ REGISTRATION_TASKS_REQUEST,
+ REGISTRATION_TASKS_RESPONSE,
+ REGISTRATION_TASKS_RESULT,
TAKEOVER_PARTITION_REQUEST,
TAKEOVER_PARTITION_RESPONSE,
TAKEOVER_METADATA_NODE_REQUEST,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
new file mode 100644
index 0000000..3f8ced8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportLocalCountersMessage implements ICcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(ReportLocalCountersMessage.class.getName());
+ private final long maxResourceId;
+ private final long maxTxnId;
+ private final String src;
+
+ public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) {
+ this.src = src;
+ this.maxResourceId = maxResourceId;
+ this.maxTxnId = maxTxnId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ TxnIdFactory.ensureMinimumId(maxTxnId);
+ resourceIdManager.report(src, maxResourceId);
+ }
+
+ public static void send(NodeControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = cs;
+ INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+ long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+ ReportLocalCountersMessage countersMessage =
+ new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
+ try {
+ ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Unable to report local counters", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return ReportLocalCountersMessage.class.getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
new file mode 100644
index 0000000..785ad2f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportLocalCountersRequestMessage implements INcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
+ }
+
+ @Override
+ public String toString() {
+ return ReportLocalCountersRequestMessage.class.getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
deleted file mode 100644
index 277c0ba..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportMaxResourceIdMessage implements ICcAddressedMessage {
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
- private final long maxResourceId;
- private final String src;
-
- public ReportMaxResourceIdMessage(String src, long maxResourceId) {
- this.src = src;
- this.maxResourceId = maxResourceId;
- }
-
- public long getMaxResourceId() {
- return maxResourceId;
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- resourceIdManager.report(src, maxResourceId);
- }
-
- public static void send(NodeControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = cs;
- INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
- long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
- MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
- try {
- ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public String toString() {
- return ReportMaxResourceIdMessage.class.getSimpleName();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
deleted file mode 100644
index a43376d..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportMaxResourceIdRequestMessage implements INcAddressedMessage {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ReportMaxResourceIdMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
- }
-
- @Override
- public String toString() {
- return ReportMaxResourceIdRequestMessage.class.getSimpleName();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index decc1a9..a2f4aa1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -61,7 +61,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage {
private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
ICCMessageBroker broker) throws Exception {
Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes();
- ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+ ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
for (String nodeId : getParticipantNodes) {
if (!resourceIdManager.reported(nodeId)) {
broker.sendApplicationMessageToNC(msg, nodeId);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
index 71d7f56..eb59e74 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
@@ -37,6 +37,6 @@ public class TxnIdFactory {
}
public static void ensureMinimumId(long id) {
- TxnIdFactory.id.set(id);
+ TxnIdFactory.id.updateAndGet(current -> Math.max(current, id));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
index b84f1f2..10a9a3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.client;
public enum NodeStatus {
- ALIVE,
+ ACTIVE,
+ BOOTING,
DEAD
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 3cd6235..4928564 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -136,7 +136,7 @@ public class NodeManager implements INodeManager {
public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
nodeRegistry.forEach(
- (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ALIVE, ncState.getDataPort(),
+ (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
return result;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 445a15c..bb28c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -97,10 +97,10 @@ public class SchedulerTest extends TestCase {
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
dataPort, resultPort, messagingPort);
ncNameToNcInfos.put("nc7",
- new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort),
+ new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort),
new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
ncNameToNcInfos.put("nc12",
- new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort),
+ new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort),
new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
InputSplit[] fileSplits = new InputSplit[12];
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/893d385f/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index c3d86e8..1814e85 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -135,7 +135,7 @@ public class TestUtils {
String ncId = ncNamePrefix + i;
String ncAddress = addressPrefix + i;
ncNameToNcInfos.put(ncId,
- new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+ new NodeControllerInfo(ncId, NodeStatus.ACTIVE, new NetworkAddress(ncAddress, netPort),
new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2));
}
return ncNameToNcInfos;