You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/05/04 19:27:45 UTC
[17/50] [abbrv] hadoop git commit: HDDS-13. Refactor
StorageContainerManager into seperate RPC endpoints. Contributed by Anu
Engineer.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
new file mode 100644
index 0000000..e42b887
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.sendContainerReport;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
+
+
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
+
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+
+/**
+ * Protocol Handler for Datanode Protocol.
+ */
+public class SCMDatanodeProtocolServer implements
+ StorageContainerDatanodeProtocol {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SCMDatanodeProtocolServer.class);
+
+ /**
+ * The RPC server that listens to requests from DataNodes.
+ */
+ private final RPC.Server datanodeRpcServer;
+
+ private final StorageContainerManager scm;
+ private final InetSocketAddress datanodeRpcAddress;
+
+ public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
+ StorageContainerManager scm) throws IOException {
+
+ Preconditions.checkNotNull(scm, "SCM cannot be null");
+ this.scm = scm;
+ final int handlerCount =
+ conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
+ OZONE_SCM_HANDLER_COUNT_DEFAULT);
+
+ RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ BlockingService dnProtoPbService =
+ StorageContainerDatanodeProtocolProtos
+ .StorageContainerDatanodeProtocolService
+ .newReflectiveBlockingService(
+ new StorageContainerDatanodeProtocolServerSideTranslatorPB(
+ this));
+
+ InetSocketAddress datanodeRpcAddr =
+ HddsServerUtil.getScmDataNodeBindAddress(conf);
+
+ datanodeRpcServer =
+ startRpcServer(
+ conf,
+ datanodeRpcAddr,
+ StorageContainerDatanodeProtocolPB.class,
+ dnProtoPbService,
+ handlerCount);
+
+ datanodeRpcAddress =
+ updateRPCListenAddress(
+ conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
+ datanodeRpcServer);
+ }
+
+ public InetSocketAddress getDatanodeRpcAddress() {
+ return datanodeRpcAddress;
+ }
+
+ public RPC.Server getDatanodeRpcServer() {
+ return datanodeRpcServer;
+ }
+
+ @Override
+ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
+ versionRequest)
+ throws IOException {
+ return scm.getScmNodeManager().getVersion(versionRequest)
+ .getProtobufMessage();
+ }
+
+ @Override
+ public SCMHeartbeatResponseProto sendHeartbeat(
+ HddsProtos.DatanodeDetailsProto datanodeDetails,
+ StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport,
+ StorageContainerDatanodeProtocolProtos.ReportState reportState)
+ throws IOException {
+ List<SCMCommand> commands =
+ scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
+ reportState);
+ List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
+ for (SCMCommand cmd : commands) {
+ cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));
+ }
+ return SCMHeartbeatResponseProto.newBuilder()
+ .addAllCommands(cmdResponses).build();
+ }
+
+ @Override
+ public SCMRegisteredCmdResponseProto register(
+ HddsProtos.DatanodeDetailsProto datanodeDetails, String[] scmAddresses)
+ throws IOException {
+ // TODO : Return the list of Nodes that forms the SCM HA.
+ return getRegisteredResponse(scm.getScmNodeManager()
+ .register(datanodeDetails), null);
+ }
+
+ @VisibleForTesting
+ public static SCMRegisteredCmdResponseProto getRegisteredResponse(
+ SCMCommand cmd,
+ StorageContainerDatanodeProtocolProtos.SCMNodeAddressList addressList) {
+ Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
+ RegisteredCommand rCmd = (RegisteredCommand) cmd;
+ SCMCmdType type = cmd.getType();
+ if (type != SCMCmdType.registeredCommand) {
+ throw new IllegalArgumentException(
+ "Registered command is not well " + "formed. Internal Error.");
+ }
+ return SCMRegisteredCmdResponseProto.newBuilder()
+ // TODO : Fix this later when we have multiple SCM support.
+ // .setAddressList(addressList)
+ .setErrorCode(rCmd.getError())
+ .setClusterID(rCmd.getClusterID())
+ .setDatanodeUUID(rCmd.getDatanodeUUID())
+ .build();
+ }
+
+ @Override
+ public ContainerReportsResponseProto sendContainerReport(
+ ContainerReportsRequestProto reports)
+ throws IOException {
+ updateContainerReportMetrics(reports);
+
+ // should we process container reports async?
+ scm.getScmContainerManager().processContainerReports(reports);
+ return ContainerReportsResponseProto.newBuilder().build();
+ }
+
+ private void updateContainerReportMetrics(
+ ContainerReportsRequestProto reports) {
+ ContainerStat newStat = null;
+ // TODO: We should update the logic once incremental container report
+ // type is supported.
+ if (reports
+ .getType() == StorageContainerDatanodeProtocolProtos
+ .ContainerReportsRequestProto.reportType.fullReport) {
+ newStat = new ContainerStat();
+ for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+ .getReportsList()) {
+ newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+ info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+ info.getReadCount(), info.getWriteCount()));
+ }
+
+ // update container metrics
+ StorageContainerManager.getMetrics().setLastContainerStat(newStat);
+ }
+
+ // Update container stat entry, this will trigger a removal operation if it
+ // exists in cache.
+ synchronized (scm.getContainerReportCache()) {
+ String datanodeUuid = reports.getDatanodeDetails().getUuid();
+ if (datanodeUuid != null && newStat != null) {
+ scm.getContainerReportCache().put(datanodeUuid, newStat);
+ // update global view container metrics
+ StorageContainerManager.getMetrics().incrContainerStat(newStat);
+ }
+ }
+ }
+
+
+ @Override
+ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ ContainerBlocksDeletionACKProto acks) throws IOException {
+ if (acks.getResultsCount() > 0) {
+ List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
+ for (DeleteBlockTransactionResult result : resultList) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+ + "success={}", result.getTxID(), result.getSuccess());
+ }
+ if (result.getSuccess()) {
+ LOG.debug("Purging TXID={} from block deletion log",
+ result.getTxID());
+ scm.getScmBlockManager().getDeletedBlockLog()
+ .commitTransactions(Collections.singletonList(result.getTxID()));
+ } else {
+ LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ + "TX in next interval", result.getTxID());
+ }
+ }
+ }
+ return ContainerBlocksDeletionACKResponseProto.newBuilder()
+ .getDefaultInstanceForType();
+ }
+
+ public void start() {
+ LOG.info(
+ StorageContainerManager.buildRpcServerStartMessage(
+ "RPC server for DataNodes", getDatanodeRpcAddress()));
+ getDatanodeRpcServer().start();
+ }
+
+ public void stop() {
+ try {
+ LOG.info("Stopping the RPC server for DataNodes");
+ datanodeRpcServer.stop();
+ } catch (Exception ex) {
+ LOG.error(" datanodeRpcServer stop failed.", ex);
+ }
+ IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+ }
+
+ public void join() throws InterruptedException {
+ LOG.trace("Join RPC server for DataNodes");
+ datanodeRpcServer.join();
+ }
+
+ /**
+ * Returns a SCMCommandRepose from the SCM Command.
+ *
+ * @param cmd - Cmd
+ * @return SCMCommandResponseProto
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
+ getCommandResponse(
+ SCMCommand cmd, final String datanodeID) throws IOException {
+ SCMCmdType type = cmd.getType();
+ SCMCommandResponseProto.Builder builder =
+ SCMCommandResponseProto.newBuilder().setDatanodeUUID(datanodeID);
+ switch (type) {
+ case registeredCommand:
+ return builder
+ .setCmdType(registeredCommand)
+ .setRegisteredProto(SCMRegisteredCmdResponseProto
+ .getDefaultInstance())
+ .build();
+ case versionCommand:
+ return builder
+ .setCmdType(versionCommand)
+ .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
+ .build();
+ case sendContainerReport:
+ return builder
+ .setCmdType(sendContainerReport)
+ .setSendReport(SendContainerReportProto.getDefaultInstance())
+ .build();
+ case reregisterCommand:
+ return builder
+ .setCmdType(reregisterCommand)
+ .setReregisterProto(SCMReregisterCmdResponseProto
+ .getDefaultInstance())
+ .build();
+ case deleteBlocksCommand:
+ // Once SCM sends out the deletion message, increment the count.
+ // this is done here instead of when SCM receives the ACK, because
+ // DN might not be able to response the ACK for sometime. In case
+ // it times out, SCM needs to re-send the message some more times.
+ List<Long> txs =
+ ((DeleteBlocksCommand) cmd)
+ .blocksTobeDeleted()
+ .stream()
+ .map(tx -> tx.getTxID())
+ .collect(Collectors.toList());
+ scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
+ return builder
+ .setCmdType(deleteBlocksCommand)
+ .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+ .build();
+ case closeContainerCommand:
+ return builder
+ .setCmdType(closeContainerCommand)
+ .setCloseContainerProto(((CloseContainerCommand) cmd).getProto())
+ .build();
+ default:
+ throw new IllegalArgumentException("Not implemented");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
new file mode 100644
index 0000000..22d4d56
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
+
+import java.util.Map;
+
+/**
+ *
+ * This is the JMX management interface for scm information.
+ */
+@InterfaceAudience.Private
+public interface SCMMXBean extends ServiceRuntimeInfo {
+
+ /**
+ * Get the SCM RPC server port that used to listen to datanode requests.
+ * @return SCM datanode RPC server port
+ */
+ String getDatanodeRpcPort();
+
+ /**
+ * Get the SCM RPC server port that used to listen to client requests.
+ * @return SCM client RPC server port
+ */
+ String getClientRpcPort();
+
+ /**
+ * Get container report info that includes container IO stats of nodes.
+ * @return The datanodeUUid to report json string mapping
+ */
+ Map<String, String> getContainerReport();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
new file mode 100644
index 0000000..be6c1af
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.ozone.common.Storage;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
+import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
+
+/**
+ * SCMStorage is responsible for management of the StorageDirectories used by
+ * the SCM.
+ */
+public class SCMStorage extends Storage {
+
+ /**
+ * Construct SCMStorage.
+ * @throws IOException if any directories are inaccessible.
+ */
+ public SCMStorage(OzoneConfiguration conf) throws IOException {
+ super(NodeType.SCM, getOzoneMetaDirPath(conf), STORAGE_DIR);
+ }
+
+ public void setScmId(String scmId) throws IOException {
+ if (getState() == StorageState.INITIALIZED) {
+ throw new IOException("SCM is already initialized.");
+ } else {
+ getStorageInfo().setProperty(SCM_ID, scmId);
+ }
+ }
+
+ /**
+ * Retrieves the SCM ID from the version file.
+ * @return SCM_ID
+ */
+ public String getScmId() {
+ return getStorageInfo().getProperty(SCM_ID);
+ }
+
+ @Override
+ protected Properties getNodeProperties() {
+ String scmId = getScmId();
+ if (scmId == null) {
+ scmId = UUID.randomUUID().toString();
+ }
+ Properties scmProperties = new Properties();
+ scmProperties.setProperty(SCM_ID, scmId);
+ return scmProperties;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
new file mode 100644
index 0000000..af7dd3f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+/**
+ * StorageContainerManager is the main entry point for the service that
+ * provides information about
+ * which SCM nodes host containers.
+ *
+ * <p>DataNodes report to StorageContainerManager using heartbeat messages.
+ * SCM allocates containers
+ * and returns a pipeline.
+ *
+ * <p>A client once it gets a pipeline (a list of datanodes) will connect to
+ * the datanodes and
+ * create a container, which then can be used to store data.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
+public final class StorageContainerManager extends ServiceRuntimeInfoImpl
+ implements SCMMXBean {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(StorageContainerManager.class);
+ private static final String USAGE =
+ "Usage: \n ozone scm [genericOptions] "
+ + "[ "
+ + StartupOption.INIT.getName()
+ + " [ "
+ + StartupOption.CLUSTERID.getName()
+ + " <cid> ] ]\n "
+ + "ozone scm [genericOptions] [ "
+ + StartupOption.GENCLUSTERID.getName()
+ + " ]\n "
+ + "ozone scm [ "
+ + StartupOption.HELP.getName()
+ + " ]\n";
+ /**
+ * SCM metrics.
+ */
+ private static SCMMetrics metrics;
+
+ /*
+ * RPC Endpoints exposed by SCM.
+ */
+ private final SCMDatanodeProtocolServer datanodeProtocolServer;
+ private final SCMBlockProtocolServer blockProtocolServer;
+ private final SCMClientProtocolServer clientProtocolServer;
+
+ /*
+ * State Managers of SCM.
+ */
+ private final NodeManager scmNodeManager;
+ private final Mapping scmContainerManager;
+ private final BlockManager scmBlockManager;
+ private final SCMStorage scmStorage;
+ /*
+ * HTTP endpoint for JMX access.
+ */
+ private final StorageContainerManagerHttpServer httpServer;
+ /**
+ * SCM super user.
+ */
+ private final String scmUsername;
+ private final Collection<String> scmAdminUsernames;
+ /**
+ * SCM mxbean.
+ */
+ private ObjectName scmInfoBeanName;
+ /**
+ * Key = DatanodeUuid, value = ContainerStat.
+ */
+ private Cache<String, ContainerStat> containerReportCache;
+
+ /**
+ * Creates a new StorageContainerManager. Configuration will be updated
+ * with information on the
+ * actual listening addresses used for RPC servers.
+ *
+ * @param conf configuration
+ */
+ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
+
+ final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+ OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
+ StorageContainerManager.initMetrics();
+ initContainerReportCache(conf);
+
+ scmStorage = new SCMStorage(conf);
+ if (scmStorage.getState() != StorageState.INITIALIZED) {
+ throw new SCMException("SCM not initialized.", ResultCodes
+ .SCM_NOT_INITIALIZED);
+ }
+
+ scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
+ scmContainerManager = new ContainerMapping(conf, getScmNodeManager(),
+ cacheSize);
+
+ scmBlockManager =
+ new BlockManagerImpl(conf, getScmNodeManager(), scmContainerManager,
+ cacheSize);
+
+ scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
+ .OZONE_ADMINISTRATORS);
+ scmUsername = UserGroupInformation.getCurrentUser().getUserName();
+ if (!scmAdminUsernames.contains(scmUsername)) {
+ scmAdminUsernames.add(scmUsername);
+ }
+
+ datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this);
+ blockProtocolServer = new SCMBlockProtocolServer(conf, this);
+ clientProtocolServer = new SCMClientProtocolServer(conf, this);
+ httpServer = new StorageContainerManagerHttpServer(conf);
+
+ registerMXBean();
+ }
+
+ /**
+ * Builds a message for logging startup information about an RPC server.
+ *
+ * @param description RPC server description
+ * @param addr RPC server listening address
+ * @return server startup message
+ */
+ public static String buildRpcServerStartMessage(String description,
+ InetSocketAddress addr) {
+ return addr != null
+ ? String.format("%s is listening at %s", description, addr.toString())
+ : String.format("%s not started", description);
+ }
+
+ /**
+ * Starts an RPC server, if configured.
+ *
+ * @param conf configuration
+ * @param addr configured address of RPC server
+ * @param protocol RPC protocol provided by RPC server
+ * @param instance RPC protocol implementation instance
+ * @param handlerCount RPC server handler count
+ * @return RPC server
+ * @throws IOException if there is an I/O error while creating RPC server
+ */
+ public static RPC.Server startRpcServer(
+ OzoneConfiguration conf,
+ InetSocketAddress addr,
+ Class<?> protocol,
+ BlockingService instance,
+ int handlerCount)
+ throws IOException {
+ RPC.Server rpcServer =
+ new RPC.Builder(conf)
+ .setProtocol(protocol)
+ .setInstance(instance)
+ .setBindAddress(addr.getHostString())
+ .setPort(addr.getPort())
+ .setNumHandlers(handlerCount)
+ .setVerbose(false)
+ .setSecretManager(null)
+ .build();
+
+ DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+ return rpcServer;
+ }
+
+ /**
+ * Main entry point for starting StorageContainerManager.
+ *
+ * @param argv arguments
+ * @throws IOException if startup fails due to I/O error
+ */
+ public static void main(String[] argv) throws IOException {
+ if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
+ System.exit(0);
+ }
+ try {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+ if (!hParser.isParseSuccessful()) {
+ System.err.println("USAGE: " + USAGE + "\n");
+ hParser.printGenericCommandUsage(System.err);
+ System.exit(1);
+ }
+ StringUtils.startupShutdownMessage(StorageContainerManager.class, argv,
+ LOG);
+ StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf);
+ if (scm != null) {
+ scm.start();
+ scm.join();
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to start the StorageContainerManager.", t);
+ terminate(1, t);
+ }
+ }
+
+ private static void printUsage(PrintStream out) {
+ out.println(USAGE + "\n");
+ }
+
+ public static StorageContainerManager createSCM(String[] argv,
+ OzoneConfiguration conf)
+ throws IOException {
+ if (!HddsUtils.isHddsEnabled(conf)) {
+ System.err.println(
+ "SCM cannot be started in secure mode or when " + OZONE_ENABLED + "" +
+ " is set to false");
+ System.exit(1);
+ }
+ StartupOption startOpt = parseArguments(argv);
+ if (startOpt == null) {
+ printUsage(System.err);
+ terminate(1);
+ return null;
+ }
+ switch (startOpt) {
+ case INIT:
+ terminate(scmInit(conf) ? 0 : 1);
+ return null;
+ case GENCLUSTERID:
+ System.out.println("Generating new cluster id:");
+ System.out.println(StorageInfo.newClusterID());
+ terminate(0);
+ return null;
+ case HELP:
+ printUsage(System.err);
+ terminate(0);
+ return null;
+ default:
+ return new StorageContainerManager(conf);
+ }
+ }
+
+ /**
+ * Routine to set up the Version info for StorageContainerManager.
+ *
+ * @param conf OzoneConfiguration
+ * @return true if SCM initialization is successful, false otherwise.
+ * @throws IOException if init fails due to I/O error
+ */
+ public static boolean scmInit(OzoneConfiguration conf) throws IOException {
+ SCMStorage scmStorage = new SCMStorage(conf);
+ StorageState state = scmStorage.getState();
+ if (state != StorageState.INITIALIZED) {
+ try {
+ String clusterId = StartupOption.INIT.getClusterId();
+ if (clusterId != null && !clusterId.isEmpty()) {
+ scmStorage.setClusterId(clusterId);
+ }
+ scmStorage.initialize();
+ System.out.println(
+ "SCM initialization succeeded."
+ + "Current cluster id for sd="
+ + scmStorage.getStorageDir()
+ + ";cid="
+ + scmStorage.getClusterID());
+ return true;
+ } catch (IOException ioe) {
+ LOG.error("Could not initialize SCM version file", ioe);
+ return false;
+ }
+ } else {
+ System.out.println(
+ "SCM already initialized. Reusing existing"
+ + " cluster id for sd="
+ + scmStorage.getStorageDir()
+ + ";cid="
+ + scmStorage.getClusterID());
+ return true;
+ }
+ }
+
+ private static StartupOption parseArguments(String[] args) {
+ int argsLen = (args == null) ? 0 : args.length;
+ StartupOption startOpt = StartupOption.HELP;
+ if (argsLen == 0) {
+ startOpt = StartupOption.REGULAR;
+ }
+ for (int i = 0; i < argsLen; i++) {
+ String cmd = args[i];
+ if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
+ startOpt = StartupOption.INIT;
+ if (argsLen > 3) {
+ return null;
+ }
+ for (i = i + 1; i < argsLen; i++) {
+ if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
+ i++;
+ if (i < argsLen && !args[i].isEmpty()) {
+ startOpt.setClusterId(args[i]);
+ } else {
+ // if no cluster id specified or is empty string, return null
+ LOG.error(
+ "Must specify a valid cluster ID after the "
+ + StartupOption.CLUSTERID.getName()
+ + " flag");
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+ } else {
+ if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
+ if (argsLen > 1) {
+ return null;
+ }
+ startOpt = StartupOption.GENCLUSTERID;
+ }
+ }
+ }
+ return startOpt;
+ }
+
+ /**
+ * Initialize SCM metrics.
+ */
+ public static void initMetrics() {
+ metrics = SCMMetrics.create();
+ }
+
+ /**
+ * Return SCM metrics instance.
+ */
+ public static SCMMetrics getMetrics() {
+ return metrics == null ? SCMMetrics.create() : metrics;
+ }
+
+ public SCMStorage getScmStorage() {
+ return scmStorage;
+ }
+
+ public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
+ return datanodeProtocolServer;
+ }
+
+ public SCMBlockProtocolServer getBlockProtocolServer() {
+ return blockProtocolServer;
+ }
+
+ public SCMClientProtocolServer getClientProtocolServer() {
+ return clientProtocolServer;
+ }
+
+ /**
+ * Initialize container reports cache that sent from datanodes.
+ *
+ * @param conf
+ */
+ private void initContainerReportCache(OzoneConfiguration conf) {
+ containerReportCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+ .maximumSize(Integer.MAX_VALUE)
+ .removalListener(
+ new RemovalListener<String, ContainerStat>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<String, ContainerStat>
+ removalNotification) {
+ synchronized (containerReportCache) {
+ ContainerStat stat = removalNotification.getValue();
+ // remove invalid container report
+ metrics.decrContainerStat(stat);
+ LOG.debug(
+ "Remove expired container stat entry for datanode: " +
+ "{}.",
+ removalNotification.getKey());
+ }
+ }
+ })
+ .build();
+ }
+
+ private void registerMXBean() {
+ Map<String, String> jmxProperties = new HashMap<>();
+ jmxProperties.put("component", "ServerRuntime");
+ this.scmInfoBeanName =
+ MBeans.register(
+ "StorageContainerManager", "StorageContainerManagerInfo",
+ jmxProperties, this);
+ }
+
+ private void unregisterMXBean() {
+ if (this.scmInfoBeanName != null) {
+ MBeans.unregister(this.scmInfoBeanName);
+ this.scmInfoBeanName = null;
+ }
+ }
+
+ @VisibleForTesting
+ public ContainerInfo getContainerInfo(String containerName) throws
+ IOException {
+ return scmContainerManager.getContainer(containerName);
+ }
+
+ /**
+ * Returns listening address of StorageLocation Protocol RPC server.
+ *
+ * @return listen address of StorageLocation RPC server
+ */
+ @VisibleForTesting
+ public InetSocketAddress getClientRpcAddress() {
+ return getClientProtocolServer().getClientRpcAddress();
+ }
+
+ @Override
+ public String getClientRpcPort() {
+ InetSocketAddress addr = getClientRpcAddress();
+ return addr == null ? "0" : Integer.toString(addr.getPort());
+ }
+
+ /**
+ * Returns listening address of StorageDatanode Protocol RPC server.
+ *
+ * @return Address where datanode are communicating.
+ */
+ public InetSocketAddress getDatanodeRpcAddress() {
+ return getDatanodeProtocolServer().getDatanodeRpcAddress();
+ }
+
+ @Override
+ public String getDatanodeRpcPort() {
+ InetSocketAddress addr = getDatanodeRpcAddress();
+ return addr == null ? "0" : Integer.toString(addr.getPort());
+ }
+
+ /**
+ * Start service.
+ */
+ public void start() throws IOException {
+ LOG.info(
+ buildRpcServerStartMessage(
+ "StorageContainerLocationProtocol RPC server",
+ getClientRpcAddress()));
+ DefaultMetricsSystem.initialize("StorageContainerManager");
+ getClientProtocolServer().start();
+
+ LOG.info(buildRpcServerStartMessage("ScmBlockLocationProtocol RPC " +
+ "server", getBlockProtocolServer().getBlockRpcAddress()));
+ getBlockProtocolServer().start();
+
+ LOG.info(buildRpcServerStartMessage("ScmDatanodeProtocl RPC " +
+ "server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
+ getDatanodeProtocolServer().start();
+
+ httpServer.start();
+ scmBlockManager.start();
+
+ setStartTime();
+ }
+
+ /**
+ * Stop service.
+ */
+ public void stop() {
+
+ try {
+ LOG.info("Stopping datanode service RPC server");
+ getDatanodeProtocolServer().stop();
+
+ } catch (Exception ex) {
+ LOG.error("Storage Container Manager datanode RPC stop failed.", ex);
+ }
+
+ try {
+ LOG.info("Stopping block service RPC server");
+ getBlockProtocolServer().stop();
+ } catch (Exception ex) {
+ LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
+ }
+
+ try {
+ LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
+ getClientProtocolServer().stop();
+ } catch (Exception ex) {
+ LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
+ }
+
+ try {
+ LOG.info("Stopping Storage Container Manager HTTP server.");
+ httpServer.stop();
+ } catch (Exception ex) {
+ LOG.error("Storage Container Manager HTTP server stop failed.", ex);
+ }
+
+ try {
+ LOG.info("Stopping Block Manager Service.");
+ scmBlockManager.stop();
+ } catch (Exception ex) {
+ LOG.error("SCM block manager service stop failed.", ex);
+ }
+
+ if (containerReportCache != null) {
+ containerReportCache.invalidateAll();
+ containerReportCache.cleanUp();
+ }
+
+ if (metrics != null) {
+ metrics.unRegister();
+ }
+
+ unregisterMXBean();
+ IOUtils.cleanupWithLogger(LOG, scmContainerManager);
+ }
+
+ /**
+ * Wait until service has completed shutdown.
+ */
+ public void join() {
+ try {
+ getBlockProtocolServer().join();
+ getClientProtocolServer().join();
+ getDatanodeProtocolServer().join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Interrupted during StorageContainerManager join.");
+ }
+ }
+
+ /**
+ * Returns the Number of Datanodes that are communicating with SCM.
+ *
+ * @param nodestate Healthy, Dead etc.
+ * @return int -- count
+ */
+ public int getNodeCount(NodeState nodestate) {
+ return scmNodeManager.getNodeCount(nodestate);
+ }
+
+ /**
+ * Returns SCM container manager.
+ */
+ @VisibleForTesting
+ public Mapping getScmContainerManager() {
+ return scmContainerManager;
+ }
+
+ /**
+ * Returns node manager.
+ *
+ * @return - Node Manager
+ */
+ @VisibleForTesting
+ public NodeManager getScmNodeManager() {
+ return scmNodeManager;
+ }
+
+ @VisibleForTesting
+ public BlockManager getScmBlockManager() {
+ return scmBlockManager;
+ }
+
+ @VisibleForTesting
+ public String getPpcRemoteUsername() {
+ UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
+ return user == null ? null : user.getUserName();
+ }
+
+ public void checkAdminAccess() throws IOException {
+ String remoteUser = getPpcRemoteUsername();
+ if (remoteUser != null) {
+ if (!scmAdminUsernames.contains(remoteUser)) {
+ throw new IOException(
+ "Access denied for user " + remoteUser + ". Superuser privilege " +
+ "is required.");
+ }
+ }
+ }
+
+ /**
+ * Invalidate container stat entry for given datanode.
+ *
+ * @param datanodeUuid
+ */
+ public void removeContainerReport(String datanodeUuid) {
+ synchronized (containerReportCache) {
+ containerReportCache.invalidate(datanodeUuid);
+ }
+ }
+
+ /**
+ * Get container stat of specified datanode.
+ *
+ * @param datanodeUuid
+ * @return
+ */
+ public ContainerStat getContainerReport(String datanodeUuid) {
+ ContainerStat stat = null;
+ synchronized (containerReportCache) {
+ stat = containerReportCache.getIfPresent(datanodeUuid);
+ }
+
+ return stat;
+ }
+
+ /**
+ * Returns a view of the container stat entries. Modifications made to the
+ * map will directly
+ * affect the cache.
+ *
+ * @return
+ */
+ public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
+ return containerReportCache.asMap();
+ }
+
+ @Override
+ public Map<String, String> getContainerReport() {
+ Map<String, String> id2StatMap = new HashMap<>();
+ synchronized (containerReportCache) {
+ ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
+ for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
+ id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
+ }
+ }
+
+ return id2StatMap;
+ }
+
+ /**
+ * Startup options.
+ */
+ public enum StartupOption {
+ INIT("-init"),
+ CLUSTERID("-clusterid"),
+ GENCLUSTERID("-genclusterid"),
+ REGULAR("-regular"),
+ HELP("-help");
+
+ private final String name;
+ private String clusterId = null;
+
+ StartupOption(String arg) {
+ this.name = arg;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String cid) {
+ if (cid != null && !cid.isEmpty()) {
+ clusterId = cid;
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
new file mode 100644
index 0000000..75b2036
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManagerHttpServer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.server.BaseHttpServer;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import java.io.IOException;
+
+/**
+ * HttpServer2 wrapper for the Ozone Storage Container Manager.
+ */
+public class StorageContainerManagerHttpServer extends BaseHttpServer {
+
+ public StorageContainerManagerHttpServer(Configuration conf)
+ throws IOException {
+ super(conf, "scm");
+ }
+
+ @Override protected String getHttpAddressKey() {
+ return ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY;
+ }
+
+ @Override protected String getHttpBindHostKey() {
+ return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_KEY;
+ }
+
+ @Override protected String getHttpsAddressKey() {
+ return ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY;
+ }
+
+ @Override protected String getHttpsBindHostKey() {
+ return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_HOST_KEY;
+ }
+
+ @Override protected String getBindHostDefault() {
+ return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_DEFAULT;
+ }
+
+ @Override protected int getHttpBindPortDefault() {
+ return ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT;
+ }
+
+ @Override protected int getHttpsBindPortDefault() {
+ return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_PORT_DEFAULT;
+ }
+
+ @Override protected String getKeytabFile() {
+ return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE;
+ }
+
+ @Override protected String getSpnegoPrincipal() {
+ return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
+ }
+
+ @Override protected String getEnabledKey() {
+ return ScmConfigKeys.OZONE_SCM_HTTP_ENABLED_KEY;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java
new file mode 100644
index 0000000..fe07272
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
+ * information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java
index 5ca9dd7..0dbb7c1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManagerHttpServer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManagerHttpServer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.http.HttpConfig;
@@ -36,7 +37,9 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
@@ -95,7 +98,7 @@ public class TestStorageContainerManagerHttpServer {
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, policy.name());
conf.set(ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, "localhost:0");
- InetSocketAddress addr = InetSocketAddress.createUnresolved("localhost", 0);
+ InetSocketAddress.createUnresolved("localhost", 0);
StorageContainerManagerHttpServer server = null;
try {
server = new StorageContainerManagerHttpServer(conf);
@@ -128,7 +131,7 @@ public class TestStorageContainerManagerHttpServer {
URLConnection conn = connectionFactory.openConnection(url);
conn.connect();
conn.getContent();
- } catch (Exception e) {
+ } catch (IOException e) {
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/common/src/main/bin/ozone
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/bin/ozone b/hadoop-ozone/common/src/main/bin/ozone
index 7419743..00261c7 100755
--- a/hadoop-ozone/common/src/main/bin/ozone
+++ b/hadoop-ozone/common/src/main/bin/ozone
@@ -108,7 +108,7 @@ function ozonecmd_case
;;
scm)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
- HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.StorageContainerManager'
+ HADOOP_CLASSNAME='org.apache.hadoop.hdds.scm.server.StorageContainerManager'
hadoop_debug "Appending HDFS_STORAGECONTAINERMANAGER_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_STORAGECONTAINERMANAGER_OPTS}"
;;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 87d203e..f745788 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.junit.After;
@@ -70,7 +70,8 @@ public class TestContainerStateManager {
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
String container1 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
@@ -87,7 +88,8 @@ public class TestContainerStateManager {
// Check there are two containers in ALLOCATED state after allocation
String container2 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2, containerOwner);
int numContainers = containerStateManager
.getMatchingContainerIDs(containerOwner,
@@ -101,7 +103,8 @@ public class TestContainerStateManager {
// Allocate 5 containers in ALLOCATED state and 5 in CREATING state
String cname = "container" + RandomStringUtils.randomNumeric(5);
for (int i = 0; i < 10; i++) {
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname + i, containerOwner);
if (i >= 5) {
scm.getScmContainerManager()
@@ -128,7 +131,8 @@ public class TestContainerStateManager {
@Test
public void testGetMatchingContainer() throws IOException {
String container1 = "container-01234";
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
scmContainerMapping.updateContainerState(container1,
HddsProtos.LifeCycleEvent.CREATE);
@@ -136,7 +140,8 @@ public class TestContainerStateManager {
HddsProtos.LifeCycleEvent.CREATED);
String container2 = "container-56789";
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2, containerOwner);
ContainerInfo info = containerStateManager
@@ -177,7 +182,8 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
String container1 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@@ -229,7 +235,8 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
String container2 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2, containerOwner);
scmContainerMapping.updateContainerState(container2,
HddsProtos.LifeCycleEvent.CREATE);
@@ -243,7 +250,8 @@ public class TestContainerStateManager {
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED
String container3 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container3, containerOwner);
scmContainerMapping.updateContainerState(container3,
HddsProtos.LifeCycleEvent.CREATE);
@@ -262,7 +270,8 @@ public class TestContainerStateManager {
@Test
public void testUpdatingAllocatedBytes() throws Exception {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
- scm.allocateContainer(xceiverClientManager.getType(),
+ scm.getClientProtocolServer().allocateContainer(
+ xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
scmContainerMapping.updateContainerState(container1,
HddsProtos.LifeCycleEvent.CREATE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 8c49f65..091d771 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.hdds.scm.protocolPB
@@ -38,6 +38,17 @@ import java.util.concurrent.TimeoutException;
public interface MiniOzoneCluster {
/**
+ * Returns the Builder to construct MiniOzoneCluster.
+ *
+ * @param conf OzoneConfiguration
+ *
+ * @return MiniOzoneCluster builder
+ */
+ static Builder newBuilder(OzoneConfiguration conf) {
+ return new MiniOzoneClusterImpl.Builder(conf);
+ }
+
+ /**
* Returns the configuration object associated with the MiniOzoneCluster.
*
* @return Configuration
@@ -119,8 +130,8 @@ public interface MiniOzoneCluster {
* @return StorageContainerLocation Client
* @throws IOException
*/
- StorageContainerLocationProtocolClientSideTranslatorPB getStorageContainerLocationClient()
- throws IOException;
+ StorageContainerLocationProtocolClientSideTranslatorPB
+ getStorageContainerLocationClient() throws IOException;
/**
* Restarts StorageContainerManager instance.
@@ -156,19 +167,9 @@ public interface MiniOzoneCluster {
void shutdown();
/**
- * Returns the Builder to construct MiniOzoneCluster.
- *
- * @param conf OzoneConfiguration
- *
- * @return MiniOzoneCluster builder
- */
- static Builder newBuilder(OzoneConfiguration conf) {
- return new MiniOzoneClusterImpl.Builder(conf);
- }
-
- /**
* Builder class for MiniOzoneCluster.
*/
+ @SuppressWarnings("CheckStyle")
abstract class Builder {
protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
@@ -261,7 +262,6 @@ public interface MiniOzoneCluster {
return this;
}
-
/**
* Sets the number of HeartBeat Interval of Datanodes, the value should be
* in MilliSeconds.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 891f67b..17872f4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -35,14 +35,14 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.ksm.KSMStorage;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
@@ -179,8 +179,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
* @throws IOException if there is an I/O error
*/
@Override
- public StorageContainerLocationProtocolClientSideTranslatorPB getStorageContainerLocationClient()
- throws IOException {
+ public StorageContainerLocationProtocolClientSideTranslatorPB
+ getStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getClientRpcAddress();
@@ -226,7 +226,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
File baseDir = new File(GenericTestUtils.getTempPath(
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
- scm.getScmInfo().getClusterId()));
+ scm.getClientProtocolServer().getScmInfo().getClusterId()));
FileUtils.deleteDirectory(baseDir);
if (ksm != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index fa307c9..5a5a08b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -32,8 +32,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.StorageContainerManager.StartupOption;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -113,7 +113,7 @@ public class TestStorageContainerManager {
.thenReturn(fakeUser);
try {
- mockScm.deleteContainer("container1");
+ mockScm.getClientProtocolServer().deleteContainer("container1");
fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) {
if (expectPermissionDenied) {
@@ -127,8 +127,8 @@ public class TestStorageContainerManager {
}
try {
- Pipeline pipeLine2 = mockScm.allocateContainer(
- xceiverClientManager.getType(),
+ Pipeline pipeLine2 = mockScm.getClientProtocolServer()
+ .allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "container2", "OZONE");
if (expectPermissionDenied) {
fail("Operation should fail, expecting an IOException here.");
@@ -140,8 +140,8 @@ public class TestStorageContainerManager {
}
try {
- Pipeline pipeLine3 = mockScm.allocateContainer(
- xceiverClientManager.getType(),
+ Pipeline pipeLine3 = mockScm.getClientProtocolServer()
+ .allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, "container3", "OZONE");
if (expectPermissionDenied) {
@@ -155,7 +155,7 @@ public class TestStorageContainerManager {
}
try {
- mockScm.getContainer("container4");
+ mockScm.getClientProtocolServer().getContainer("container4");
fail("Operation should fail, expecting an IOException here.");
} catch (Exception e) {
if (expectPermissionDenied) {
@@ -436,7 +436,7 @@ public class TestStorageContainerManager {
scmStore.initialize();
StorageContainerManager scm = StorageContainerManager.createSCM(null, conf);
//Reads the SCM Info from SCM instance
- ScmInfo scmInfo = scm.getScmInfo();
+ ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
Assert.assertEquals(clusterId, scmInfo.getClusterId());
Assert.assertEquals(scmId, scmInfo.getScmId());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index 7005ea0..9917018 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -157,7 +157,7 @@ public class TestStorageContainerManagerHelper {
private MetadataStore getContainerMetadata(String containerName)
throws IOException {
Pipeline pipeline = cluster.getStorageContainerManager()
- .getContainer(containerName);
+ .getClientProtocolServer().getContainer(containerName);
DatanodeDetails leadDN = pipeline.getLeader();
OzoneContainer containerServer =
getContainerServerByDatanodeUuid(leadDN.getUuidString());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
index ad5783e..a2a04e0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.junit.AfterClass;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index a37f005..ae0ffa0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.hdds.scm.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ServicePort;
@@ -646,7 +646,8 @@ public class TestKeySpaceManager {
keys.add(keyArgs.getResourceName());
exception.expect(IOException.class);
exception.expectMessage("Specified block key does not exist");
- cluster.getStorageContainerManager().getBlockLocations(keys);
+ cluster.getStorageContainerManager().getBlockProtocolServer()
+ .getBlockLocations(keys);
// Delete the key again to test deleting non-existing key.
exception.expect(IOException.class);
@@ -818,9 +819,6 @@ public class TestKeySpaceManager {
listKeyArgs = new ListArgs(bucketArgs, null, 100, null);
result = storageHandler.listKeys(listKeyArgs);
Assert.assertEquals(numKeys, result.getKeyList().size());
- List<KeyInfo> allKeys = result.getKeyList().stream()
- .filter(item -> item.getSize() == 4096)
- .collect(Collectors.toList());
// List keys with prefix "aKey".
listKeyArgs = new ListArgs(bucketArgs, "aKey", 100, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
index fffdbff..04473d1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -124,7 +124,7 @@ public class TestSCMCli {
public void testCreateContainer() throws Exception {
String containerName = "containerTestCreate";
try {
- scm.getContainer(containerName);
+ scm.getClientProtocolServer().getContainer(containerName);
fail("should not be able to get the container");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
@@ -132,14 +132,16 @@ public class TestSCMCli {
}
String[] args = {"-container", "-create", "-c", containerName};
assertEquals(ResultCode.SUCCESS, cli.run(args));
- Pipeline container = scm.getContainer(containerName);
+ Pipeline container = scm.getClientProtocolServer()
+ .getContainer(containerName);
assertNotNull(container);
assertEquals(containerName, container.getContainerName());
}
private boolean containerExist(String containerName) {
try {
- Pipeline scmPipeline = scm.getContainer(containerName);
+ Pipeline scmPipeline = scm.getClientProtocolServer()
+ .getContainer(containerName);
return scmPipeline != null
&& containerName.equals(scmPipeline.getContainerName());
} catch (IOException e) {
@@ -447,7 +449,8 @@ public class TestSCMCli {
String containerName = "containerTestClose";
String[] args = {"-container", "-create", "-c", containerName};
assertEquals(ResultCode.SUCCESS, cli.run(args));
- Pipeline container = scm.getContainer(containerName);
+ Pipeline container = scm.getClientProtocolServer()
+ .getContainer(containerName);
assertNotNull(container);
assertEquals(containerName, container.getContainerName());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index 27a9404..372fd3d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.scm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0c3dc4c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
index c28f68f..332e679 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java
@@ -26,7 +26,7 @@ import java.util.UUID;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -80,7 +80,7 @@ public class TestSCMMetrics {
ContainerReportsRequestProto request = createContainerReport(numReport,
stat, null);
String fstDatanodeUuid = request.getDatanodeDetails().getUuid();
- scmManager.sendContainerReport(request);
+ scmManager.getDatanodeProtocolServer().sendContainerReport(request);
// verify container stat metrics
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -103,7 +103,7 @@ public class TestSCMMetrics {
// add one new report
request = createContainerReport(1, stat, null);
String sndDatanodeUuid = request.getDatanodeDetails().getUuid();
- scmManager.sendContainerReport(request);
+ scmManager.getDatanodeProtocolServer().sendContainerReport(request);
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * (numReport + 1),
@@ -125,12 +125,12 @@ public class TestSCMMetrics {
// Re-send reports but with different value for validating
// the aggregation.
stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
- scmManager.sendContainerReport(createContainerReport(1, stat,
- fstDatanodeUuid));
+ scmManager.getDatanodeProtocolServer().sendContainerReport(
+ createContainerReport(1, stat, fstDatanodeUuid));
stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
- scmManager.sendContainerReport(createContainerReport(1, stat,
- sndDatanodeUuid));
+ scmManager.getDatanodeProtocolServer().sendContainerReport(
+ createContainerReport(1, stat, sndDatanodeUuid));
// the global container metrics value should be updated
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
@@ -175,7 +175,7 @@ public class TestSCMMetrics {
.getDatanodeDetails().getUuidString();
ContainerReportsRequestProto request = createContainerReport(numReport,
stat, datanodeUuid);
- scmManager.sendContainerReport(request);
+ scmManager.getDatanodeProtocolServer().sendContainerReport(request);
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * numReport,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org