You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/02/15 02:37:01 UTC

asterixdb git commit: [NO ISSUE] Add support for NodeGroup upsert, etc.

Repository: asterixdb
Updated Branches:
  refs/heads/master ca8d0830e -> 15924f3cc


[NO ISSUE] Add support for NodeGroup upsert, etc.

Also, enable resource id generation when only pending removal nodes are absent
from the cluster

Change-Id: I15cfb74bc345680102cedafa99f7ff4f144860bc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2389
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Integration-Tests: Murtadha Hubail <mh...@apache.org>
Tested-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/15924f3c
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/15924f3c
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/15924f3c

Branch: refs/heads/master
Commit: 15924f3cc5735a8d88ab1b9da1384691cc48a95b
Parents: ca8d083
Author: Michael Blow <mb...@apache.org>
Authored: Wed Feb 14 20:34:10 2018 -0500
Committer: Michael Blow <mb...@apache.org>
Committed: Wed Feb 14 18:36:11 2018 -0800

----------------------------------------------------------------------
 .../api/http/server/ClusterApiServlet.java        |  2 +-
 .../common/cluster/IClusterStateManager.java      |  2 +-
 .../apache/asterix/metadata/MetadataCache.java    |  8 ++------
 .../apache/asterix/metadata/MetadataManager.java  | 13 ++++++++++++-
 .../org/apache/asterix/metadata/MetadataNode.java |  5 +++--
 .../metadata/MetadataTransactionContext.java      |  2 +-
 .../asterix/metadata/api/IMetadataManager.java    | 15 +++++++++++++--
 .../asterix/metadata/api/IMetadataNode.java       |  6 +++++-
 .../asterix/metadata/entities/NodeGroup.java      |  2 +-
 .../runtime/message/ResourceIdRequestMessage.java | 18 +++++++++---------
 .../runtime/transaction/ResourceIdManager.java    |  3 ++-
 .../runtime/utils/ClusterStateManager.java        |  3 ++-
 .../hyracks/control/cc/cluster/NodeManager.java   |  4 ++--
 13 files changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index d239038..a3ad089 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -163,7 +163,7 @@ public class ClusterApiServlet extends AbstractServlet {
     private void processPartitionMaster(IServletRequest request, IServletResponse response) {
         final String partition = request.getParameter("partition");
         final String node = request.getParameter("node");
-        appCtx.getClusterStateManager().updateClusterPartition(Integer.valueOf(partition), node, true);
+        appCtx.getClusterStateManager().updateClusterPartition(Integer.parseInt(partition), node, true);
         response.setStatus(HttpResponseStatus.OK);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0a5707e..dda9ffd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -66,7 +66,7 @@ public interface IClusterStateManager {
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
      */
-    void updateClusterPartition(Integer partitionNum, String activeNode, boolean active);
+    void updateClusterPartition(int partitionNum, String activeNode, boolean active);
 
     /**
      * Updates the metadata node id and its state.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index a12b079..b994c50 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -21,7 +21,6 @@ package org.apache.asterix.metadata;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -191,12 +190,9 @@ public class MetadataCache {
         }
     }
 
-    public NodeGroup addNodeGroupIfNotExists(NodeGroup nodeGroup) {
+    public NodeGroup addOrUpdateNodeGroup(NodeGroup nodeGroup) {
         synchronized (nodeGroups) {
-            if (!nodeGroups.containsKey(nodeGroup.getNodeGroupName())) {
-                return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
-            }
-            return null;
+            return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b2757f2..a5e3a84 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -57,6 +57,7 @@ import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.entities.Node;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -480,8 +481,18 @@ public abstract class MetadataManager implements IMetadataManager {
 
     @Override
     public void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException {
+        modifyNodegroup(ctx, nodeGroup, Operation.INSERT);
+    }
+
+    @Override
+    public void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException {
+        modifyNodegroup(ctx, nodeGroup, Operation.UPSERT);
+    }
+
+    public void modifyNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup, Operation op)
+            throws AlgebricksException {
         try {
-            metadataNode.addNodeGroup(ctx.getTxnId(), nodeGroup);
+            metadataNode.modifyNodeGroup(ctx.getTxnId(), nodeGroup, op);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 966c99a..64d0389 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -404,11 +404,12 @@ public class MetadataNode implements IMetadataNode {
     }
 
     @Override
-    public void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException {
+    public void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup, Operation modificationOp)
+            throws AlgebricksException, RemoteException {
         try {
             NodeGroupTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getNodeGroupTupleTranslator(true);
             ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(nodeGroup);
-            insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
+            modifyMetadataIndex(modificationOp, txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
         } catch (HyracksDataException e) {
             if (e.getComponent().equals(ErrorCode.HYRACKS) && e.getErrorCode() == ErrorCode.DUPLICATE_KEY) {
                 throw new AlgebricksException(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index cb67ee5..961a1ee 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -152,7 +152,7 @@ public class MetadataTransactionContext extends MetadataCache {
 
     public void dropNodeGroup(String nodeGroupName) {
         NodeGroup nodeGroup = new NodeGroup(nodeGroupName, null);
-        droppedCache.addNodeGroupIfNotExists(nodeGroup);
+        droppedCache.addOrUpdateNodeGroup(nodeGroup);
         logAndApply(new MetadataLogicalOperation(nodeGroup, false));
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index e030db3..ff349e1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -320,18 +320,29 @@ public interface IMetadataManager extends IMetadataBootstrap {
             throws AlgebricksException;
 
     /**
-     * Inserts a node group.
+     * Inserts a new node group.
      *
      * @param ctx
      *            MetadataTransactionContext of an active metadata transaction.
      * @param nodeGroup
      *            Node group instance to insert.
      * @throws AlgebricksException
-     *             For example, if the node group already exists.
+     *             For example, if the node group already exists
      */
     void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException;
 
     /**
+     * Inserts a new (or updates an existing) node group.
+     *
+     * @param ctx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param nodeGroup
+     *            Node group instance to insert or update.
+     * @throws AlgebricksException
+     */
+    void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException;
+
+    /**
      * Retrieves a node group.
      *
      * @param ctx

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index f6abc53..dc23db4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -41,6 +41,7 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.entities.Node;
 import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 /**
@@ -340,11 +341,14 @@ public interface IMetadataNode extends Remote, Serializable {
      *            A globally unique id for an active metadata transaction.
      * @param nodeGroup
      *            Node group instance to insert.
+     * @param modificationOp
      * @throws AlgebricksException
      *             For example, if the node group already exists.
      * @throws RemoteException
      */
-    void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException;
+    void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup,
+            AbstractIndexModificationOperationCallback.Operation modificationOp)
+            throws AlgebricksException, RemoteException;
 
     /**
      * Retrieves a node group, acquiring local locks on behalf of the given

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index e5088aa..8a7d5ec 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -50,7 +50,7 @@ public class NodeGroup implements IMetadataEntity<NodeGroup> {
 
     @Override
     public NodeGroup addToCache(MetadataCache cache) {
-        return cache.addNodeGroupIfNotExists(this);
+        return cache.addOrUpdateNodeGroup(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 3e172c7..087913f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -41,16 +41,16 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage {
             ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
             ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
-            if (!clusterStateManager.isClusterActive()) {
-                response.setResourceId(-1);
-                response.setException(new Exception("Cannot generate global resource id when cluster is not active."));
-            } else {
-                IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-                response.setResourceId(resourceIdManager.createResourceId());
-                if (response.getResourceId() < 0) {
+            IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+            response.setResourceId(resourceIdManager.createResourceId());
+            if (response.getResourceId() < 0) {
+                if (!(clusterStateManager.isClusterActive())) {
+                    response.setException(
+                            new Exception("Cannot generate global resource id when cluster is not active."));
+                } else {
                     response.setException(new Exception("One or more nodes has not reported max resource id."));
+                    requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
                 }
-                requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
             }
             broker.sendApplicationMessageToNC(response, src);
         } catch (Exception e) {
@@ -60,7 +60,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage {
 
     private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
             ICCMessageBroker broker) throws Exception {
-        Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes();
+        Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true);
         ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 0bb862d..6d3077e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -38,7 +38,8 @@ public class ResourceIdManager implements IResourceIdManager {
 
     @Override
     public long createResourceId() {
-        return csm.isClusterActive() ? globalResourceId.incrementAndGet() : -1;
+        return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true))
+                ? globalResourceId.incrementAndGet() : -1;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 76668d2..03a6868 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -150,7 +150,7 @@ public class ClusterStateManager implements IClusterStateManager {
     }
 
     @Override
-    public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) {
+    public synchronized void updateClusterPartition(int partitionNum, String activeNode, boolean active) {
         ClusterPartition clusterPartition = clusterPartitions.get(partitionNum);
         if (clusterPartition != null) {
             // set the active node for this node's partitions
@@ -159,6 +159,7 @@ public class ClusterStateManager implements IClusterStateManager {
                 clusterPartition.setActiveNodeId(activeNode);
                 clusterPartition.setPendingActivation(false);
             }
+            notifyAll();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8f73864..77e7bf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -101,7 +101,7 @@ public class NodeManager implements INodeManager {
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
             LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
-            failNonDeadNode(nodeId);
+            failNode(nodeId);
         } else {
             try {
                 // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
@@ -173,7 +173,7 @@ public class NodeManager implements INodeManager {
         return Pair.of(deadNodes, affectedJobIds);
     }
 
-    private void failNonDeadNode(String nodeId) throws HyracksException {
+    public void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.