You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/01 05:43:18 UTC
[2/2] hadoop git commit: HDDS-13. Refactor StorageContainerManager
into seperate RPC endpoints. Contributed by Anu Engineer.
HDDS-13. Refactor StorageContainerManager into seperate RPC endpoints. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0c3dc4c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0c3dc4c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0c3dc4c
Branch: refs/heads/trunk
Commit: f0c3dc4cf40575497ca6f29c037e43fa50e0ffdd
Parents: 2d319e3
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Apr 30 21:41:10 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Apr 30 21:41:10 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdds/scm/SCMMXBean.java | 50 -
.../org/apache/hadoop/hdds/scm/SCMStorage.java | 73 -
.../hdds/scm/StorageContainerManager.java | 1290 ------------------
.../scm/StorageContainerManagerHttpServer.java | 76 --
.../hadoop/hdds/scm/node/SCMNodeManager.java | 5 +-
.../hdds/scm/server/SCMBlockProtocolServer.java | 222 +++
.../scm/server/SCMClientProtocolServer.java | 314 +++++
.../scm/server/SCMDatanodeProtocolServer.java | 350 +++++
.../hadoop/hdds/scm/server/SCMMXBean.java | 50 +
.../hadoop/hdds/scm/server/SCMStorage.java | 73 +
.../scm/server/StorageContainerManager.java | 722 ++++++++++
.../StorageContainerManagerHttpServer.java | 77 ++
.../hadoop/hdds/scm/server/package-info.java | 22 +
.../TestStorageContainerManagerHttpServer.java | 7 +-
hadoop-ozone/common/src/main/bin/ozone | 2 +-
.../container/TestContainerStateManager.java | 29 +-
.../apache/hadoop/ozone/MiniOzoneCluster.java | 34 +-
.../hadoop/ozone/MiniOzoneClusterImpl.java | 10 +-
.../ozone/TestStorageContainerManager.java | 20 +-
.../TestStorageContainerManagerHelper.java | 2 +-
.../ozone/ksm/TestContainerReportWithKeys.java | 2 +-
.../hadoop/ozone/ksm/TestKeySpaceManager.java | 8 +-
.../org/apache/hadoop/ozone/scm/TestSCMCli.java | 13 +-
.../apache/hadoop/ozone/scm/TestSCMMXBean.java | 2 +-
.../apache/hadoop/ozone/scm/TestSCMMetrics.java | 16 +-
25 files changed, 1912 insertions(+), 1557 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMMXBean.java
deleted file mode 100644
index 17b6814..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMMXBean.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
-
-import java.util.Map;
-
-/**
- *
- * This is the JMX management interface for scm information.
- */
-@InterfaceAudience.Private
-public interface SCMMXBean extends ServiceRuntimeInfo {
-
- /**
- * Get the SCM RPC server port that used to listen to datanode requests.
- * @return SCM datanode RPC server port
- */
- String getDatanodeRpcPort();
-
- /**
- * Get the SCM RPC server port that used to listen to client requests.
- * @return SCM client RPC server port
- */
- String getClientRpcPort();
-
- /**
- * Get container report info that includes container IO stats of nodes.
- * @return The datanodeUUid to report json string mapping
- */
- Map<String, String> getContainerReport();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMStorage.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMStorage.java
deleted file mode 100644
index 27e9363..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMStorage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-import org.apache.hadoop.ozone.common.Storage;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
-import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
-
-/**
- * SCMStorage is responsible for management of the StorageDirectories used by
- * the SCM.
- */
-public class SCMStorage extends Storage {
-
- /**
- * Construct SCMStorage.
- * @throws IOException if any directories are inaccessible.
- */
- public SCMStorage(OzoneConfiguration conf) throws IOException {
- super(NodeType.SCM, getOzoneMetaDirPath(conf), STORAGE_DIR);
- }
-
- public void setScmId(String scmId) throws IOException {
- if (getState() == StorageState.INITIALIZED) {
- throw new IOException("SCM is already initialized.");
- } else {
- getStorageInfo().setProperty(SCM_ID, scmId);
- }
- }
-
- /**
- * Retrieves the SCM ID from the version file.
- * @return SCM_ID
- */
- public String getScmId() {
- return getStorageInfo().getProperty(SCM_ID);
- }
-
- @Override
- protected Properties getNodeProperties() {
- String scmId = getScmId();
- if (scmId == null) {
- scmId = UUID.randomUUID().toString();
- }
- Properties scmProperties = new Properties();
- scmProperties.setProperty(SCM_ID, scmId);
- return scmProperties;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
deleted file mode 100644
index ce0d4f8..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
+++ /dev/null
@@ -1,1290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.container.ContainerMapping;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
- .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos
- .ContainerBlocksDeletionACKResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.Storage.StorageState;
-import org.apache.hadoop.ozone.common.StorageInfo;
-import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocolPB
- .ScmBlockLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
- .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB
- .StorageContainerLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CLIENT_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdds.protocol.proto
- .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
-import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-/**
- * StorageContainerManager is the main entry point for the service that provides
- * information about which SCM nodes host containers.
- *
- * DataNodes report to StorageContainerManager using heartbeat
- * messages. SCM allocates containers and returns a pipeline.
- *
- * A client once it gets a pipeline (a list of datanodes) will connect to the
- * datanodes and create a container, which then can be used to store data.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
-public class StorageContainerManager extends ServiceRuntimeInfoImpl
- implements StorageContainerDatanodeProtocol,
- StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(StorageContainerManager.class);
-
- /**
- * Startup options.
- */
- public enum StartupOption {
- INIT("-init"),
- CLUSTERID("-clusterid"),
- GENCLUSTERID("-genclusterid"),
- REGULAR("-regular"),
- HELP("-help");
-
- private final String name;
- private String clusterId = null;
-
- public void setClusterId(String cid) {
- if(cid != null && !cid.isEmpty()) {
- clusterId = cid;
- }
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- StartupOption(String arg) {
- this.name = arg;
- }
-
- public String getName() {
- return name;
- }
- }
-
- /**
- * NodeManager and container Managers for SCM.
- */
- private final NodeManager scmNodeManager;
- private final Mapping scmContainerManager;
- private final BlockManager scmBlockManager;
- private final SCMStorage scmStorage;
-
- /** The RPC server that listens to requests from DataNodes. */
- private final RPC.Server datanodeRpcServer;
- private final InetSocketAddress datanodeRpcAddress;
-
- /** The RPC server that listens to requests from clients. */
- private final RPC.Server clientRpcServer;
- private final InetSocketAddress clientRpcAddress;
-
- /** The RPC server that listens to requests from block service clients. */
- private final RPC.Server blockRpcServer;
- private final InetSocketAddress blockRpcAddress;
-
- private final StorageContainerManagerHttpServer httpServer;
-
- /** SCM mxbean. */
- private ObjectName scmInfoBeanName;
-
- /** SCM super user. */
- private final String scmUsername;
- private final Collection<String> scmAdminUsernames;
-
- /** SCM metrics. */
- private static SCMMetrics metrics;
- /** Key = DatanodeUuid, value = ContainerStat. */
- private Cache<String, ContainerStat> containerReportCache;
-
-
- private static final String USAGE =
- "Usage: \n ozone scm [genericOptions] "
- + "[ " + StartupOption.INIT.getName() + " [ "
- + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n "
- + "ozone scm [genericOptions] [ "
- + StartupOption.GENCLUSTERID.getName() + " ]\n " +
- "ozone scm [ "
- + StartupOption.HELP.getName() + " ]\n";
- /**
- * Creates a new StorageContainerManager. Configuration will be updated with
- * information on the actual listening addresses used for RPC servers.
- *
- * @param conf configuration
- */
- private StorageContainerManager(OzoneConfiguration conf)
- throws IOException {
-
- final int handlerCount = conf.getInt(
- OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
- final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-
- StorageContainerManager.initMetrics();
- initContainerReportCache(conf);
-
- scmStorage = new SCMStorage(conf);
- if (scmStorage.getState() != StorageState.INITIALIZED) {
- throw new SCMException("SCM not initialized.",
- ResultCodes.SCM_NOT_INITIALIZED);
- }
- scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
- scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
- scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
- scmContainerManager, cacheSize);
-
- scmAdminUsernames = conf.getTrimmedStringCollection(
- OzoneConfigKeys.OZONE_ADMINISTRATORS);
- scmUsername = UserGroupInformation.getCurrentUser().getUserName();
- if (!scmAdminUsernames.contains(scmUsername)) {
- scmAdminUsernames.add(scmUsername);
- }
-
- RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
- ProtobufRpcEngine.class);
- RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
- ProtobufRpcEngine.class);
- RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
- ProtobufRpcEngine.class);
-
- BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
- StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
- new StorageContainerDatanodeProtocolServerSideTranslatorPB(this));
-
- final InetSocketAddress datanodeRpcAddr =
- HddsServerUtil.getScmDataNodeBindAddress(conf);
- datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
- StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
- handlerCount);
- datanodeRpcAddress = updateRPCListenAddress(conf,
- OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
-
- // SCM Container Service RPC
- BlockingService storageProtoPbService =
- StorageContainerLocationProtocolProtos
- .StorageContainerLocationProtocolService
- .newReflectiveBlockingService(
- new StorageContainerLocationProtocolServerSideTranslatorPB(this));
-
- final InetSocketAddress scmAddress =
- HddsServerUtil.getScmClientBindAddress(conf);
- clientRpcServer = startRpcServer(conf, scmAddress,
- StorageContainerLocationProtocolPB.class, storageProtoPbService,
- handlerCount);
- clientRpcAddress = updateRPCListenAddress(conf,
- OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
-
- // SCM Block Service RPC
- BlockingService blockProtoPbService =
- ScmBlockLocationProtocolProtos
- .ScmBlockLocationProtocolService
- .newReflectiveBlockingService(
- new ScmBlockLocationProtocolServerSideTranslatorPB(this));
-
- final InetSocketAddress scmBlockAddress =
- HddsServerUtil.getScmBlockClientBindAddress(conf);
- blockRpcServer = startRpcServer(conf, scmBlockAddress,
- ScmBlockLocationProtocolPB.class, blockProtoPbService,
- handlerCount);
- blockRpcAddress = updateRPCListenAddress(conf,
- OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
-
- httpServer = new StorageContainerManagerHttpServer(conf);
-
- registerMXBean();
- }
-
- /**
- * Initialize container reports cache that sent from datanodes.
- *
- * @param conf
- */
- private void initContainerReportCache(OzoneConfiguration conf) {
- containerReportCache = CacheBuilder.newBuilder()
- .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
- .maximumSize(Integer.MAX_VALUE)
- .removalListener(new RemovalListener<String, ContainerStat>() {
- @Override
- public void onRemoval(
- RemovalNotification<String, ContainerStat> removalNotification) {
- synchronized (containerReportCache) {
- ContainerStat stat = removalNotification.getValue();
- // remove invalid container report
- metrics.decrContainerStat(stat);
- LOG.debug(
- "Remove expired container stat entry for datanode: {}.",
- removalNotification.getKey());
- }
- }
- }).build();
- }
-
- /**
- * Builds a message for logging startup information about an RPC server.
- *
- * @param description RPC server description
- * @param addr RPC server listening address
- * @return server startup message
- */
- private static String buildRpcServerStartMessage(String description,
- InetSocketAddress addr) {
- return addr != null ? String.format("%s is listening at %s",
- description, addr.toString()) :
- String.format("%s not started", description);
- }
-
- /**
- * Starts an RPC server, if configured.
- *
- * @param conf configuration
- * @param addr configured address of RPC server
- * @param protocol RPC protocol provided by RPC server
- * @param instance RPC protocol implementation instance
- * @param handlerCount RPC server handler count
- *
- * @return RPC server
- * @throws IOException if there is an I/O error while creating RPC server
- */
- private static RPC.Server startRpcServer(OzoneConfiguration conf,
- InetSocketAddress addr, Class<?> protocol, BlockingService instance,
- int handlerCount)
- throws IOException {
- RPC.Server rpcServer = new RPC.Builder(conf)
- .setProtocol(protocol)
- .setInstance(instance)
- .setBindAddress(addr.getHostString())
- .setPort(addr.getPort())
- .setNumHandlers(handlerCount)
- .setVerbose(false)
- .setSecretManager(null)
- .build();
-
- DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
- return rpcServer;
- }
-
- private void registerMXBean() {
- Map<String, String> jmxProperties = new HashMap<>();
- jmxProperties.put("component", "ServerRuntime");
- this.scmInfoBeanName =
- MBeans.register("StorageContainerManager",
- "StorageContainerManagerInfo",
- jmxProperties,
- this);
- }
-
- private void unregisterMXBean() {
- if(this.scmInfoBeanName != null) {
- MBeans.unregister(this.scmInfoBeanName);
- this.scmInfoBeanName = null;
- }
- }
-
- /**
- * Main entry point for starting StorageContainerManager.
- *
- * @param argv arguments
- * @throws IOException if startup fails due to I/O error
- */
- public static void main(String[] argv) throws IOException {
- if (DFSUtil.parseHelpArgument(argv, USAGE,
- System.out, true)) {
- System.exit(0);
- }
- try {
- OzoneConfiguration conf = new OzoneConfiguration();
- GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
- if (!hParser.isParseSuccessful()) {
- System.err.println("USAGE: " + USAGE + "\n");
- hParser.printGenericCommandUsage(System.err);
- System.exit(1);
- }
- StringUtils.startupShutdownMessage(StorageContainerManager.class,
- argv, LOG);
- StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf);
- if (scm != null) {
- scm.start();
- scm.join();
- }
- } catch (Throwable t) {
- LOG.error("Failed to start the StorageContainerManager.", t);
- terminate(1, t);
- }
- }
-
- private static void printUsage(PrintStream out) {
- out.println(USAGE + "\n");
- }
-
- public static StorageContainerManager createSCM(String[] argv,
- OzoneConfiguration conf) throws IOException {
- if (!HddsUtils.isHddsEnabled(conf)) {
- System.err.println("SCM cannot be started in secure mode or when " +
- OZONE_ENABLED + " is set to false");
- System.exit(1);
- }
- StartupOption startOpt = parseArguments(argv);
- if (startOpt == null) {
- printUsage(System.err);
- terminate(1);
- return null;
- }
- switch (startOpt) {
- case INIT:
- terminate(scmInit(conf) ? 0 : 1);
- return null;
- case GENCLUSTERID:
- System.out.println("Generating new cluster id:");
- System.out.println(StorageInfo.newClusterID());
- terminate(0);
- return null;
- case HELP:
- printUsage(System.err);
- terminate(0);
- return null;
- default:
- return new StorageContainerManager(conf);
- }
- }
-
- /**
- * Routine to set up the Version info for StorageContainerManager.
- *
- * @param conf OzoneConfiguration
- * @return true if SCM initialization is successful, false otherwise.
- * @throws IOException if init fails due to I/O error
- */
- public static boolean scmInit(OzoneConfiguration conf) throws IOException {
- SCMStorage scmStorage = new SCMStorage(conf);
- StorageState state = scmStorage.getState();
- if (state != StorageState.INITIALIZED) {
- try {
- String clusterId = StartupOption.INIT.getClusterId();
- if (clusterId != null && !clusterId.isEmpty()) {
- scmStorage.setClusterId(clusterId);
- }
- scmStorage.initialize();
- System.out.println("SCM initialization succeeded." +
- "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
- + scmStorage.getClusterID());
- return true;
- } catch (IOException ioe) {
- LOG.error("Could not initialize SCM version file", ioe);
- return false;
- }
- } else {
- System.out.println("SCM already initialized. Reusing existing" +
- " cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
- + scmStorage.getClusterID());
- return true;
- }
- }
-
- private static StartupOption parseArguments(String[] args) {
- int argsLen = (args == null) ? 0 : args.length;
- StartupOption startOpt = StartupOption.HELP;
- if (argsLen == 0) {
- startOpt = StartupOption.REGULAR;
- }
- for (int i = 0; i < argsLen; i++) {
- String cmd = args[i];
- if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
- startOpt = StartupOption.INIT;
- if (argsLen > 3) {
- return null;
- }
- for (i = i + 1; i < argsLen; i++) {
- if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
- i++;
- if (i < argsLen && !args[i].isEmpty()) {
- startOpt.setClusterId(args[i]);
- } else {
- // if no cluster id specified or is empty string, return null
- LOG.error("Must specify a valid cluster ID after the "
- + StartupOption.CLUSTERID.getName() + " flag");
- return null;
- }
- } else {
- return null;
- }
- }
- } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
- if (argsLen > 1) {
- return null;
- }
- startOpt = StartupOption.GENCLUSTERID;
- }
- }
- return startOpt;
- }
-
- /**
- * Returns a SCMCommandRepose from the SCM Command.
- * @param cmd - Cmd
- * @return SCMCommandResponseProto
- * @throws InvalidProtocolBufferException
- */
- @VisibleForTesting
- public SCMCommandResponseProto getCommandResponse(SCMCommand cmd,
- final String datanodID)
- throws IOException {
- SCMCmdType type = cmd.getType();
- SCMCommandResponseProto.Builder builder =
- SCMCommandResponseProto.newBuilder()
- .setDatanodeUUID(datanodID);
- switch (type) {
- case registeredCommand:
- return builder.setCmdType(SCMCmdType.registeredCommand)
- .setRegisteredProto(
- SCMRegisteredCmdResponseProto.getDefaultInstance())
- .build();
- case versionCommand:
- return builder.setCmdType(SCMCmdType.versionCommand)
- .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
- .build();
- case sendContainerReport:
- return builder.setCmdType(SCMCmdType.sendContainerReport)
- .setSendReport(SendContainerReportProto.getDefaultInstance())
- .build();
- case reregisterCommand:
- return builder.setCmdType(SCMCmdType.reregisterCommand)
- .setReregisterProto(SCMReregisterCmdResponseProto
- .getDefaultInstance())
- .build();
- case deleteBlocksCommand:
- // Once SCM sends out the deletion message, increment the count.
- // this is done here instead of when SCM receives the ACK, because
- // DN might not be able to response the ACK for sometime. In case
- // it times out, SCM needs to re-send the message some more times.
- List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
- .stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
- this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
- return builder.setCmdType(SCMCmdType.deleteBlocksCommand)
- .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
- .build();
- case closeContainerCommand:
- return builder.setCmdType(SCMCmdType.closeContainerCommand)
- .setCloseContainerProto(((CloseContainerCommand)cmd).getProto())
- .build();
- default:
- throw new IllegalArgumentException("Not implemented");
- }
- }
-
- @VisibleForTesting
- public static SCMRegisteredCmdResponseProto getRegisteredResponse(
- SCMCommand cmd, SCMNodeAddressList addressList) {
- Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
- RegisteredCommand rCmd = (RegisteredCommand) cmd;
- SCMCmdType type = cmd.getType();
- if (type != SCMCmdType.registeredCommand) {
- throw new IllegalArgumentException("Registered command is not well " +
- "formed. Internal Error.");
- }
- return SCMRegisteredCmdResponseProto.newBuilder()
- //TODO : Fix this later when we have multiple SCM support.
- //.setAddressList(addressList)
- .setErrorCode(rCmd.getError())
- .setClusterID(rCmd.getClusterID())
- .setDatanodeUUID(rCmd.getDatanodeUUID()).build();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Pipeline getContainer(String containerName) throws IOException {
- checkAdminAccess();
- return scmContainerManager.getContainer(containerName).getPipeline();
- }
-
- @VisibleForTesting
- public ContainerInfo getContainerInfo(String containerName)
- throws IOException {
- return scmContainerManager.getContainer(containerName);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<ContainerInfo> listContainer(String startName,
- String prefixName, int count) throws IOException {
- return scmContainerManager.listContainer(startName, prefixName, count);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void deleteContainer(String containerName) throws IOException {
- checkAdminAccess();
- scmContainerManager.deleteContainer(containerName);
- }
-
- /**
- * Queries a list of Node Statuses.
- *
- * @param nodeStatuses
- * @param queryScope
- * @param poolName @return List of Datanodes.
- */
- @Override
- public HddsProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses,
- HddsProtos.QueryScope queryScope, String poolName) throws IOException {
-
- if (queryScope == HddsProtos.QueryScope.POOL) {
- throw new IllegalArgumentException("Not Supported yet");
- }
-
- List<DatanodeDetails> datanodes = queryNode(nodeStatuses);
- HddsProtos.NodePool.Builder poolBuilder =
- HddsProtos.NodePool.newBuilder();
-
- for (DatanodeDetails datanode : datanodes) {
- HddsProtos.Node node = HddsProtos.Node.newBuilder()
- .setNodeID(datanode.getProtoBufMessage())
- .addAllNodeStates(nodeStatuses)
- .build();
- poolBuilder.addNodes(node);
- }
-
- return poolBuilder.build();
- }
-
- /**
- * Notify from client when begin/finish operation for container/pipeline
- * objects on datanodes.
- * @param type
- * @param name
- * @param op
- * @param stage
- */
- @Override
- public void notifyObjectStageChange(
- ObjectStageChangeRequestProto.Type type, String name,
- ObjectStageChangeRequestProto.Op op,
- ObjectStageChangeRequestProto.Stage stage) throws IOException {
-
- LOG.info("Object type {} name {} op {} new stage {}",
- type, name, op, stage);
- if (type == ObjectStageChangeRequestProto.Type.container) {
- if (op == ObjectStageChangeRequestProto.Op.create) {
- if (stage == ObjectStageChangeRequestProto.Stage.begin) {
- scmContainerManager.updateContainerState(name,
- HddsProtos.LifeCycleEvent.CREATE);
- } else {
- scmContainerManager.updateContainerState(name,
- HddsProtos.LifeCycleEvent.CREATED);
- }
- } else if (op == ObjectStageChangeRequestProto.Op.close) {
- if (stage == ObjectStageChangeRequestProto.Stage.begin) {
- scmContainerManager.updateContainerState(name,
- HddsProtos.LifeCycleEvent.FINALIZE);
- } else {
- scmContainerManager.updateContainerState(name,
- HddsProtos.LifeCycleEvent.CLOSE);
- }
- }
- } //else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
- // TODO: pipeline state update will be addressed in future patch.
- //}
- }
-
- /**
- * Creates a replication pipeline of a specified type.
- */
- @Override
- public Pipeline createReplicationPipeline(
- HddsProtos.ReplicationType replicationType,
- HddsProtos.ReplicationFactor factor,
- HddsProtos.NodePool nodePool)
- throws IOException {
- // TODO: will be addressed in future patch.
- return null;
- }
-
- /**
- * Queries a list of Node that match a set of statuses.
- * <p>
- * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
- * then this call will return all healthy nodes which members in
- * Raft pipeline.
- * <p>
- * Right now we don't support operations, so we assume it is an AND operation
- * between the operators.
- *
- * @param nodeStatuses - A set of NodeStates.
- * @return List of Datanodes.
- */
-
- public List<DatanodeDetails> queryNode(EnumSet<NodeState> nodeStatuses) {
- Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
- Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
- "in the query set");
- List<DatanodeDetails> resultList = new LinkedList<>();
- Set<DatanodeDetails> currentSet = new TreeSet<>();
-
- for (NodeState nodeState : nodeStatuses) {
- Set<DatanodeDetails> nextSet = queryNodeState(nodeState);
- if ((nextSet == null) || (nextSet.size() == 0)) {
- // Right now we only support AND operation. So intersect with
- // any empty set is null.
- return resultList;
- }
- // First time we have to add all the elements, next time we have to
- // do an intersection operation on the set.
- if (currentSet.size() == 0) {
- currentSet.addAll(nextSet);
- } else {
- currentSet.retainAll(nextSet);
- }
- }
-
- resultList.addAll(currentSet);
- return resultList;
- }
-
- /**
- * Query the System for Nodes.
- *
- * @param nodeState - NodeState that we are interested in matching.
- * @return Set of Datanodes that match the NodeState.
- */
- private Set<DatanodeDetails> queryNodeState(NodeState nodeState) {
- if (nodeState == NodeState.RAFT_MEMBER ||
- nodeState == NodeState.FREE_NODE) {
- throw new IllegalStateException("Not implemented yet");
- }
- Set<DatanodeDetails> returnSet = new TreeSet<>();
- List<DatanodeDetails> tmp = getScmNodeManager().getNodes(nodeState);
- if ((tmp != null) && (tmp.size() > 0)) {
- returnSet.addAll(tmp);
- }
- return returnSet;
- }
-
- /**
- * Asks SCM where a container should be allocated. SCM responds with the set
- * of datanodes that should be used creating this container.
- *
- * @param containerName - Name of the container.
- * @param replicationFactor - replication factor.
- * @return pipeline
- * @throws IOException
- */
- @Override
- public Pipeline allocateContainer(HddsProtos.ReplicationType replicationType,
- HddsProtos.ReplicationFactor replicationFactor, String containerName,
- String owner) throws IOException {
-
- checkAdminAccess();
- return scmContainerManager
- .allocateContainer(replicationType, replicationFactor, containerName,
- owner).getPipeline();
- }
-
- /**
- * Returns listening address of StorageLocation Protocol RPC server.
- *
- * @return listen address of StorageLocation RPC server
- */
- @VisibleForTesting
- public InetSocketAddress getClientRpcAddress() {
- return clientRpcAddress;
- }
-
- @Override
- public String getClientRpcPort() {
- InetSocketAddress addr = getClientRpcAddress();
- return addr == null ? "0" : Integer.toString(addr.getPort());
- }
-
- /**
- * Returns listening address of StorageDatanode Protocol RPC server.
- *
- * @return Address where datanode are communicating.
- */
- public InetSocketAddress getDatanodeRpcAddress() {
- return datanodeRpcAddress;
- }
-
- @Override
- public String getDatanodeRpcPort() {
- InetSocketAddress addr = getDatanodeRpcAddress();
- return addr == null ? "0" : Integer.toString(addr.getPort());
- }
-
- /**
- * Start service.
- */
- public void start() throws IOException {
- LOG.info(buildRpcServerStartMessage(
- "StorageContainerLocationProtocol RPC server", clientRpcAddress));
- DefaultMetricsSystem.initialize("StorageContainerManager");
- clientRpcServer.start();
- LOG.info(buildRpcServerStartMessage(
- "ScmBlockLocationProtocol RPC server", blockRpcAddress));
- blockRpcServer.start();
- LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
- datanodeRpcAddress));
- datanodeRpcServer.start();
- httpServer.start();
- scmBlockManager.start();
-
- setStartTime();
-
- }
-
- /**
- * Stop service.
- */
- public void stop() {
- try {
- LOG.info("Stopping block service RPC server");
- blockRpcServer.stop();
- } catch (Exception ex) {
- LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
- }
-
- try {
- LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
- clientRpcServer.stop();
- } catch (Exception ex) {
- LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
- }
-
- try {
- LOG.info("Stopping the RPC server for DataNodes");
- datanodeRpcServer.stop();
- } catch (Exception ex) {
- LOG.error("Storage Container Manager datanodeRpcServer stop failed.", ex);
- }
-
- try {
- LOG.info("Stopping Storage Container Manager HTTP server.");
- httpServer.stop();
- } catch (Exception ex) {
- LOG.error("Storage Container Manager HTTP server stop failed.", ex);
- }
-
- try {
- LOG.info("Stopping Block Manager Service.");
- scmBlockManager.stop();
- } catch (Exception ex) {
- LOG.error("SCM block manager service stop failed.", ex);
- }
-
- if (containerReportCache != null) {
- containerReportCache.invalidateAll();
- containerReportCache.cleanUp();
- }
-
- if (metrics != null) {
- metrics.unRegister();
- }
-
- unregisterMXBean();
- IOUtils.cleanupWithLogger(LOG, scmContainerManager);
- IOUtils.cleanupWithLogger(LOG, scmNodeManager);
- }
-
- /**
- * Wait until service has completed shutdown.
- */
- public void join() {
- try {
- blockRpcServer.join();
- clientRpcServer.join();
- datanodeRpcServer.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.info("Interrupted during StorageContainerManager join.");
- }
- }
-
- /**
- * Returns SCM version.
- *
- * @return Version info.
- */
- @Override
- public SCMVersionResponseProto getVersion(
- SCMVersionRequestProto versionRequest) throws IOException {
- return getScmNodeManager().getVersion(versionRequest).getProtobufMessage();
- }
-
- /**
- * Used by data node to send a Heartbeat.
- *
- * @param datanodeDetails - Datanode Details.
- * @param nodeReport - Node Report
- * @param reportState - Container report ready info.
- * @return - SCMHeartbeatResponseProto
- * @throws IOException
- */
- @Override
- public SCMHeartbeatResponseProto sendHeartbeat(
- DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
- ReportState reportState) throws IOException {
- List<SCMCommand> commands =
- getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
- reportState);
- List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
- for (SCMCommand cmd : commands) {
- cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()
- .toString()));
- }
- return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
- .build();
- }
-
- /**
- * Register Datanode.
- *
- * @param datanodeDetails - DatanodID.
- * @param scmAddresses - List of SCMs this datanode is configured to
- * communicate.
- * @return SCM Command.
- */
- @Override
- public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
- register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) {
- // TODO : Return the list of Nodes that forms the SCM HA.
- return getRegisteredResponse(
- scmNodeManager.register(datanodeDetails), null);
- }
-
- /**
- * Send a container report.
- *
- * @param reports -- Container report
- * @return HeartbeatRespose.nullcommand.
- * @throws IOException
- */
- @Override
- public ContainerReportsResponseProto sendContainerReport(
- ContainerReportsRequestProto reports) throws IOException {
- updateContainerReportMetrics(reports);
-
- // should we process container reports async?
- scmContainerManager.processContainerReports(reports);
- return ContainerReportsResponseProto.newBuilder().build();
- }
-
- private void updateContainerReportMetrics(
- ContainerReportsRequestProto reports) {
- ContainerStat newStat = null;
- // TODO: We should update the logic once incremental container report
- // type is supported.
- if (reports
- .getType() == ContainerReportsRequestProto.reportType.fullReport) {
- newStat = new ContainerStat();
- for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
- .getReportsList()) {
- newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
- info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
- info.getReadCount(), info.getWriteCount()));
- }
-
- // update container metrics
- metrics.setLastContainerStat(newStat);
- }
-
- // Update container stat entry, this will trigger a removal operation if it
- // exists in cache.
- synchronized (containerReportCache) {
- String datanodeUuid = reports.getDatanodeDetails().getUuid();
- if (datanodeUuid != null && newStat != null) {
- containerReportCache.put(datanodeUuid, newStat);
- // update global view container metrics
- metrics.incrContainerStat(newStat);
- }
- }
- }
-
- /**
- * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
- * SCM considers the blocks are deleted and update the metadata in SCM DB.
- *
- * @param acks
- * @return
- * @throws IOException
- */
- @Override
- public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
- ContainerBlocksDeletionACKProto acks) throws IOException {
- if (acks.getResultsCount() > 0) {
- List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
- for (DeleteBlockTransactionResult result : resultList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
- + "success={}", result.getTxID(), result.getSuccess());
- }
- if (result.getSuccess()) {
- LOG.debug("Purging TXID={} from block deletion log",
- result.getTxID());
- this.getScmBlockManager().getDeletedBlockLog()
- .commitTransactions(Collections.singletonList(result.getTxID()));
- } else {
- LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
- + "TX in next interval", result.getTxID());
- }
- }
- }
- return ContainerBlocksDeletionACKResponseProto.newBuilder()
- .getDefaultInstanceForType();
- }
-
- /**
- * Returns the Number of Datanodes that are communicating with SCM.
- *
- * @param nodestate Healthy, Dead etc.
- * @return int -- count
- */
- public int getNodeCount(NodeState nodestate) {
- return scmNodeManager.getNodeCount(nodestate);
- }
-
- /**
- * Returns SCM container manager.
- */
- @VisibleForTesting
- public Mapping getScmContainerManager() {
- return scmContainerManager;
- }
-
- /**
- * Returns node manager.
- * @return - Node Manager
- */
- @VisibleForTesting
- public NodeManager getScmNodeManager() {
- return scmNodeManager;
- }
-
- @VisibleForTesting
- public BlockManager getScmBlockManager() {
- return scmBlockManager;
- }
-
- /**
- * Get block locations.
- * @param keys batch of block keys to retrieve.
- * @return set of allocated blocks.
- * @throws IOException
- */
- @Override
- public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
- throws IOException {
- Set<AllocatedBlock> locatedBlocks = new HashSet<>();
- for (String key: keys) {
- Pipeline pipeline = scmBlockManager.getBlock(key);
- AllocatedBlock block = new AllocatedBlock.Builder()
- .setKey(key)
- .setPipeline(pipeline).build();
- locatedBlocks.add(block);
- }
- return locatedBlocks;
- }
-
- /**
- * Asks SCM where a block should be allocated. SCM responds with the set of
- * datanodes that should be used creating this block.
- *
- * @param size - size of the block.
- * @param type - Replication type.
- * @param factor
- * @return allocated block accessing info (key, pipeline).
- * @throws IOException
- */
- @Override
- public AllocatedBlock allocateBlock(long size,
- HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
- String owner) throws IOException {
- return scmBlockManager.allocateBlock(size, type, factor, owner);
- }
-
- /**
- * Get the clusterId and SCM Id from the version file in SCM.
- */
- @Override
- public ScmInfo getScmInfo() throws IOException {
- ScmInfo.Builder builder = new ScmInfo.Builder()
- .setClusterId(scmStorage.getClusterID())
- .setScmId(scmStorage.getScmId());
- return builder.build();
- }
- /**
- * Delete blocks for a set of object keys.
- *
- * @param keyBlocksInfoList list of block keys with object keys to delete.
- * @return deletion results.
- */
- public List<DeleteBlockGroupResult> deleteKeyBlocks(
- List<BlockGroup> keyBlocksInfoList) throws IOException {
- LOG.info("SCM is informed by KSM to delete {} blocks",
- keyBlocksInfoList.size());
- List<DeleteBlockGroupResult> results = new ArrayList<>();
- for (BlockGroup keyBlocks : keyBlocksInfoList) {
- Result resultCode;
- try {
- // We delete blocks in an atomic operation to prevent getting
- // into state like only a partial of blocks are deleted,
- // which will leave key in an inconsistent state.
- scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
- resultCode = Result.success;
- } catch (SCMException scmEx) {
- LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
- switch (scmEx.getResult()) {
- case CHILL_MODE_EXCEPTION:
- resultCode = Result.chillMode;
- break;
- case FAILED_TO_FIND_BLOCK:
- resultCode = Result.errorNotFound;
- break;
- default:
- resultCode = Result.unknownFailure;
- }
- } catch (IOException ex) {
- LOG.warn("Fail to delete blocks for object key: {}",
- keyBlocks.getGroupID(), ex);
- resultCode = Result.unknownFailure;
- }
- List<DeleteBlockResult> blockResultList = new ArrayList<>();
- for (String blockKey : keyBlocks.getBlockIDList()) {
- blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
- }
- results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
- blockResultList));
- }
- return results;
- }
-
- @VisibleForTesting
- public String getPpcRemoteUsername() {
- UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
- return user == null ? null : user.getUserName();
- }
-
- private void checkAdminAccess() throws IOException {
- String remoteUser = getPpcRemoteUsername();
- if(remoteUser != null) {
- if (!scmAdminUsernames.contains(remoteUser)) {
- throw new IOException(
- "Access denied for user " + remoteUser
- + ". Superuser privilege is required.");
- }
- }
- }
-
- /**
- * Initialize SCM metrics.
- */
- public static void initMetrics() {
- metrics = SCMMetrics.create();
- }
-
- /**
- * Return SCM metrics instance.
- */
- public static SCMMetrics getMetrics() {
- return metrics == null ? SCMMetrics.create() : metrics;
- }
-
- /**
- * Invalidate container stat entry for given datanode.
- *
- * @param datanodeUuid
- */
- public void removeContainerReport(String datanodeUuid) {
- synchronized (containerReportCache) {
- containerReportCache.invalidate(datanodeUuid);
- }
- }
-
- /**
- * Get container stat of specified datanode.
- *
- * @param datanodeUuid
- * @return
- */
- public ContainerStat getContainerReport(String datanodeUuid) {
- ContainerStat stat = null;
- synchronized (containerReportCache) {
- stat = containerReportCache.getIfPresent(datanodeUuid);
- }
-
- return stat;
- }
-
- /**
- * Returns a view of the container stat entries. Modifications made to the
- * map will directly affect the cache.
- *
- * @return
- */
- public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
- return containerReportCache.asMap();
- }
-
- @Override
- public Map<String, String> getContainerReport() {
- Map<String, String> id2StatMap = new HashMap<>();
- synchronized (containerReportCache) {
- ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
- for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
- id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
- }
- }
-
- return id2StatMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
deleted file mode 100644
index 1ca059c..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.server.BaseHttpServer;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-
-import java.io.IOException;
-
-/**
- * HttpServer2 wrapper for the Ozone Storage Container Manager.
- */
-public class StorageContainerManagerHttpServer extends BaseHttpServer {
-
- public StorageContainerManagerHttpServer(Configuration conf)
- throws IOException {
- super(conf, "scm");
- }
-
- @Override protected String getHttpAddressKey() {
- return ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY;
- }
-
- @Override protected String getHttpBindHostKey() {
- return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_KEY;
- }
-
- @Override protected String getHttpsAddressKey() {
- return ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY;
- }
-
- @Override protected String getHttpsBindHostKey() {
- return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_HOST_KEY;
- }
-
- @Override protected String getBindHostDefault() {
- return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_DEFAULT;
- }
-
- @Override protected int getHttpBindPortDefault() {
- return ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT;
- }
-
- @Override protected int getHttpsBindPortDefault() {
- return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_PORT_DEFAULT;
- }
-
- @Override protected String getKeytabFile() {
- return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE;
- }
-
- @Override protected String getSpnegoPrincipal() {
- return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
- }
-
- @Override protected String getEnabledKey() {
- return ScmConfigKeys.OZONE_SCM_HTTP_ENABLED_KEY;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 6857b11..c72e2a1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -849,10 +849,11 @@ public class SCMNodeManager
.setNodeReport(nodeReport)
.setContainerReportState(containerReportState)
.build());
+ return commandQueue.getCommand(datanodeDetails.getUuid());
} else {
LOG.error("Datanode ID in heartbeat is null");
}
- return commandQueue.getCommand(datanodeDetails.getUuid());
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
new file mode 100644
index 0000000..e0560a1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.protocolPB
+ .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
+ .startRpcServer;
+
+/**
+ * SCM block protocol is the protocol used by Namenode and OzoneManager to get
+ * blocks from the SCM.
+ */
+public class SCMBlockProtocolServer implements ScmBlockLocationProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMBlockProtocolServer.class);
+
+ private final StorageContainerManager scm;
+ private final OzoneConfiguration conf;
+ private final RPC.Server blockRpcServer;
+ private final InetSocketAddress blockRpcAddress;
+
+ /**
+ * The RPC server that listens to requests from block service clients.
+ */
+ public SCMBlockProtocolServer(OzoneConfiguration conf,
+ StorageContainerManager scm) throws IOException {
+ this.scm = scm;
+ this.conf = conf;
+ final int handlerCount =
+ conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+ OZONE_SCM_HANDLER_COUNT_DEFAULT);
+
+ RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+ ProtobufRpcEngine.class);
+ // SCM Block Service RPC
+ BlockingService blockProtoPbService =
+ ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService
+ .newReflectiveBlockingService(
+ new ScmBlockLocationProtocolServerSideTranslatorPB(this));
+
+ final InetSocketAddress scmBlockAddress = HddsServerUtil
+ .getScmBlockClientBindAddress(conf);
+ blockRpcServer =
+ startRpcServer(
+ conf,
+ scmBlockAddress,
+ ScmBlockLocationProtocolPB.class,
+ blockProtoPbService,
+ handlerCount);
+ blockRpcAddress =
+ updateRPCListenAddress(
+ conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress,
+ blockRpcServer);
+
+ }
+
+ public RPC.Server getBlockRpcServer() {
+ return blockRpcServer;
+ }
+
+ public InetSocketAddress getBlockRpcAddress() {
+ return blockRpcAddress;
+ }
+
+ public void start() {
+ LOG.info(
+ StorageContainerManager.buildRpcServerStartMessage(
+ "RPC server for Block Protocol", getBlockRpcAddress()));
+ getBlockRpcServer().start();
+ }
+
+ public void stop() {
+ try {
+ LOG.info("Stopping the RPC server for Block Protocol");
+ getBlockRpcServer().stop();
+ } catch (Exception ex) {
+ LOG.error("Block Protocol RPC stop failed.", ex);
+ }
+ IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+ }
+
+ public void join() throws InterruptedException {
+ LOG.trace("Join RPC server for Block Protocol");
+ getBlockRpcServer().join();
+ }
+
+ @Override
+ public Set<AllocatedBlock> getBlockLocations(Set<String> keys) throws
+ IOException {
+ Set<AllocatedBlock> locatedBlocks = new HashSet<>();
+ for (String key : keys) {
+ Pipeline pipeline = scm.getScmBlockManager().getBlock(key);
+ AllocatedBlock block = new AllocatedBlock.Builder().setKey(key)
+ .setPipeline(pipeline).build();
+ locatedBlocks.add(block);
+ }
+ return locatedBlocks;
+
+ }
+
+ @Override
+ public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
+ type, HddsProtos.ReplicationFactor factor, String owner) throws
+ IOException {
+ return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
+ }
+
+ /**
+ * Delete blocks for a set of object keys.
+ *
+ * @param keyBlocksInfoList list of block keys with object keys to delete.
+ * @return deletion results.
+ */
+ @Override
+ public List<DeleteBlockGroupResult> deleteKeyBlocks(
+ List<BlockGroup> keyBlocksInfoList) throws IOException {
+ LOG.info("SCM is informed by KSM to delete {} blocks", keyBlocksInfoList
+ .size());
+ List<DeleteBlockGroupResult> results = new ArrayList<>();
+ for (BlockGroup keyBlocks : keyBlocksInfoList) {
+ ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode;
+ try {
+ // We delete blocks in an atomic operation to prevent getting
+ // into state like only a partial of blocks are deleted,
+ // which will leave key in an inconsistent state.
+ scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList());
+ resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+ .Result.success;
+ } catch (SCMException scmEx) {
+ LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
+ switch (scmEx.getResult()) {
+ case CHILL_MODE_EXCEPTION:
+ resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+ .Result.chillMode;
+ break;
+ case FAILED_TO_FIND_BLOCK:
+ resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+ .Result.errorNotFound;
+ break;
+ default:
+ resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+ .Result.unknownFailure;
+ }
+ } catch (IOException ex) {
+ LOG.warn("Fail to delete blocks for object key: {}", keyBlocks
+ .getGroupID(), ex);
+ resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult
+ .Result.unknownFailure;
+ }
+ List<DeleteBlockResult> blockResultList = new ArrayList<>();
+ for (String blockKey : keyBlocks.getBlockIDList()) {
+ blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
+ }
+ results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+ blockResultList));
+ }
+ return results;
+ }
+
+ @Override
+ public ScmInfo getScmInfo() throws IOException {
+ ScmInfo.Builder builder =
+ new ScmInfo.Builder()
+ .setClusterId(scm.getScmStorage().getClusterID())
+ .setScmId(scm.getScmStorage().getScmId());
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
new file mode 100644
index 0000000..42cce2f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerLocationProtocolProtos
+ .StorageContainerLocationProtocolService.newReflectiveBlockingService;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_CLIENT_ADDRESS_KEY;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
+ .startRpcServer;
+
+/**
+ * The RPC server that listens to requests from clients.
+ */
+public class SCMClientProtocolServer implements
+ StorageContainerLocationProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMClientProtocolServer.class);
+ private final RPC.Server clientRpcServer;
+ private final InetSocketAddress clientRpcAddress;
+ private final StorageContainerManager scm;
+ private final OzoneConfiguration conf;
+
+ public SCMClientProtocolServer(OzoneConfiguration conf,
+ StorageContainerManager scm) throws IOException {
+ this.scm = scm;
+ this.conf = conf;
+ final int handlerCount =
+ conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+ OZONE_SCM_HANDLER_COUNT_DEFAULT);
+ RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ // SCM Container Service RPC
+ BlockingService storageProtoPbService =
+ newReflectiveBlockingService(
+ new StorageContainerLocationProtocolServerSideTranslatorPB(this));
+
+ final InetSocketAddress scmAddress = HddsServerUtil
+ .getScmClientBindAddress(conf);
+ clientRpcServer =
+ startRpcServer(
+ conf,
+ scmAddress,
+ StorageContainerLocationProtocolPB.class,
+ storageProtoPbService,
+ handlerCount);
+ clientRpcAddress =
+ updateRPCListenAddress(conf, OZONE_SCM_CLIENT_ADDRESS_KEY,
+ scmAddress, clientRpcServer);
+
+ }
+
+ public RPC.Server getClientRpcServer() {
+ return clientRpcServer;
+ }
+
+ public InetSocketAddress getClientRpcAddress() {
+ return clientRpcAddress;
+ }
+
+ public void start() {
+ LOG.info(
+ StorageContainerManager.buildRpcServerStartMessage(
+ "RPC server for Client ", getClientRpcAddress()));
+ getClientRpcServer().start();
+ }
+
+ public void stop() {
+ try {
+ LOG.info("Stopping the RPC server for Client Protocol");
+ getClientRpcServer().stop();
+ } catch (Exception ex) {
+ LOG.error("Client Protocol RPC stop failed.", ex);
+ }
+ IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+ }
+
+ public void join() throws InterruptedException {
+ LOG.trace("Join RPC server for Client Protocol");
+ getClientRpcServer().join();
+ }
+
+ @Override
+ public Pipeline allocateContainer(HddsProtos.ReplicationType
+ replicationType, HddsProtos.ReplicationFactor factor, String
+ containerName, String owner) throws IOException {
+ scm.checkAdminAccess();
+ return scm.getScmContainerManager()
+ .allocateContainer(replicationType, factor, containerName, owner)
+ .getPipeline();
+ }
+
+ @Override
+ public Pipeline getContainer(String containerName) throws IOException {
+ return scm.getScmContainerManager()
+ .getContainer(containerName).getPipeline();
+ }
+
+ @Override
+ public List<ContainerInfo> listContainer(String startName,
+ String prefixName, int count) throws IOException {
+ return scm.getScmContainerManager()
+ .listContainer(startName, prefixName, count);
+ }
+
+ @Override
+ public void deleteContainer(String containerName) throws IOException {
+ scm.checkAdminAccess();
+ scm.getScmContainerManager().deleteContainer(containerName);
+
+ }
+
+ @Override
+ public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
+ nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) throws
+ IOException {
+
+ if (queryScope == HddsProtos.QueryScope.POOL) {
+ throw new IllegalArgumentException("Not Supported yet");
+ }
+
+ List<DatanodeDetails> datanodes = queryNode(nodeStatuses);
+ HddsProtos.NodePool.Builder poolBuilder = HddsProtos.NodePool.newBuilder();
+
+ for (DatanodeDetails datanode : datanodes) {
+ HddsProtos.Node node =
+ HddsProtos.Node.newBuilder()
+ .setNodeID(datanode.getProtoBufMessage())
+ .addAllNodeStates(nodeStatuses)
+ .build();
+ poolBuilder.addNodes(node);
+ }
+
+ return poolBuilder.build();
+
+ }
+
+ @Override
+ public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Type type, String name,
+ StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op
+ op, StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Stage stage) throws IOException {
+
+ LOG.info("Object type {} name {} op {} new stage {}", type, name, op,
+ stage);
+ if (type == StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Type.container) {
+ if (op == StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Op.create) {
+ if (stage == StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Stage.begin) {
+ scm.getScmContainerManager().updateContainerState(name, HddsProtos
+ .LifeCycleEvent.CREATE);
+ } else {
+ scm.getScmContainerManager().updateContainerState(name, HddsProtos
+ .LifeCycleEvent.CREATED);
+ }
+ } else {
+ if (op == StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Op.close) {
+ if (stage == StorageContainerLocationProtocolProtos
+ .ObjectStageChangeRequestProto.Stage.begin) {
+ scm.getScmContainerManager().updateContainerState(name, HddsProtos
+ .LifeCycleEvent.FINALIZE);
+ } else {
+ scm.getScmContainerManager().updateContainerState(name, HddsProtos
+ .LifeCycleEvent.CLOSE);
+ }
+ }
+ }
+ } // else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
+ // TODO: pipeline state update will be addressed in future patch.
+ // }
+
+ }
+
+ @Override
+ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
+ throws IOException {
+ // TODO: will be addressed in future patch.
+ // This is needed only for debugging purposes to make sure cluster is
+ // working correctly.
+ return null;
+ }
+
+ @Override
+ public ScmInfo getScmInfo() throws IOException {
+ ScmInfo.Builder builder =
+ new ScmInfo.Builder()
+ .setClusterId(scm.getScmStorage().getClusterID())
+ .setScmId(scm.getScmStorage().getScmId());
+ return builder.build();
+ }
+
+ /**
+ * Queries a list of Node that match a set of statuses.
+ *
+ * <p>For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, then
+ * this call will return all
+ * healthy nodes which members in Raft pipeline.
+ *
+ * <p>Right now we don't support operations, so we assume it is an AND
+ * operation between the
+ * operators.
+ *
+ * @param nodeStatuses - A set of NodeStates.
+ * @return List of Datanodes.
+ */
+ public List<DatanodeDetails> queryNode(EnumSet<HddsProtos.NodeState>
+ nodeStatuses) {
+ Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
+ Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
+ "in the query set");
+ List<DatanodeDetails> resultList = new LinkedList<>();
+ Set<DatanodeDetails> currentSet = new TreeSet<>();
+
+ for (HddsProtos.NodeState nodeState : nodeStatuses) {
+ Set<DatanodeDetails> nextSet = queryNodeState(nodeState);
+ if ((nextSet == null) || (nextSet.size() == 0)) {
+ // Right now we only support AND operation. So intersect with
+ // any empty set is null.
+ return resultList;
+ }
+ // First time we have to add all the elements, next time we have to
+ // do an intersection operation on the set.
+ if (currentSet.size() == 0) {
+ currentSet.addAll(nextSet);
+ } else {
+ currentSet.retainAll(nextSet);
+ }
+ }
+
+ resultList.addAll(currentSet);
+ return resultList;
+ }
+
+ /**
+ * Query the System for Nodes.
+ *
+ * @param nodeState - NodeState that we are interested in matching.
+ * @return Set of Datanodes that match the NodeState.
+ */
+ private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) {
+ if (nodeState == HddsProtos.NodeState.RAFT_MEMBER || nodeState ==
+ HddsProtos.NodeState
+ .FREE_NODE) {
+ throw new IllegalStateException("Not implemented yet");
+ }
+ Set<DatanodeDetails> returnSet = new TreeSet<>();
+ List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState);
+ if ((tmp != null) && (tmp.size() > 0)) {
+ returnSet.addAll(tmp);
+ }
+ return returnSet;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org