You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2018/07/06 19:09:42 UTC
[03/14] hadoop git commit: HDDS-167. Rename KeySpaceManager to
OzoneManager. Contributed by Arpit Agarwal.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
new file mode 100644
index 0000000..2d04452
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining Ozone Manager statistics.
+ */
+@InterfaceAudience.Private
+@Metrics(about="Ozone Manager Metrics", context="dfs")
+public class OMMetrics {
+ private static final String SOURCE_NAME =
+ OMMetrics.class.getSimpleName();
+
+ // OM request type op metrics
+ private @Metric MutableCounterLong numVolumeOps;
+ private @Metric MutableCounterLong numBucketOps;
+ private @Metric MutableCounterLong numKeyOps;
+
+ // OM op metrics
+ private @Metric MutableCounterLong numVolumeCreates;
+ private @Metric MutableCounterLong numVolumeUpdates;
+ private @Metric MutableCounterLong numVolumeInfos;
+ private @Metric MutableCounterLong numVolumeCheckAccesses;
+ private @Metric MutableCounterLong numBucketCreates;
+ private @Metric MutableCounterLong numVolumeDeletes;
+ private @Metric MutableCounterLong numBucketInfos;
+ private @Metric MutableCounterLong numBucketUpdates;
+ private @Metric MutableCounterLong numBucketDeletes;
+ private @Metric MutableCounterLong numKeyAllocate;
+ private @Metric MutableCounterLong numKeyLookup;
+ private @Metric MutableCounterLong numKeyRenames;
+ private @Metric MutableCounterLong numKeyDeletes;
+ private @Metric MutableCounterLong numBucketLists;
+ private @Metric MutableCounterLong numKeyLists;
+ private @Metric MutableCounterLong numVolumeLists;
+ private @Metric MutableCounterLong numKeyCommits;
+ private @Metric MutableCounterLong numAllocateBlockCalls;
+ private @Metric MutableCounterLong numGetServiceLists;
+
+ // Failure Metrics
+ private @Metric MutableCounterLong numVolumeCreateFails;
+ private @Metric MutableCounterLong numVolumeUpdateFails;
+ private @Metric MutableCounterLong numVolumeInfoFails;
+ private @Metric MutableCounterLong numVolumeDeleteFails;
+ private @Metric MutableCounterLong numBucketCreateFails;
+ private @Metric MutableCounterLong numVolumeCheckAccessFails;
+ private @Metric MutableCounterLong numBucketInfoFails;
+ private @Metric MutableCounterLong numBucketUpdateFails;
+ private @Metric MutableCounterLong numBucketDeleteFails;
+ private @Metric MutableCounterLong numKeyAllocateFails;
+ private @Metric MutableCounterLong numKeyLookupFails;
+ private @Metric MutableCounterLong numKeyRenameFails;
+ private @Metric MutableCounterLong numKeyDeleteFails;
+ private @Metric MutableCounterLong numBucketListFails;
+ private @Metric MutableCounterLong numKeyListFails;
+ private @Metric MutableCounterLong numVolumeListFails;
+ private @Metric MutableCounterLong numKeyCommitFails;
+ private @Metric MutableCounterLong numBlockAllocateCallFails;
+ private @Metric MutableCounterLong numGetServiceListFails;
+
+ public OMMetrics() {
+ }
+
+ public static OMMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME,
+ "Oozne Manager Metrics",
+ new OMMetrics());
+ }
+
+ public void incNumVolumeCreates() {
+ numVolumeOps.incr();
+ numVolumeCreates.incr();
+ }
+
+ public void incNumVolumeUpdates() {
+ numVolumeOps.incr();
+ numVolumeUpdates.incr();
+ }
+
+ public void incNumVolumeInfos() {
+ numVolumeOps.incr();
+ numVolumeInfos.incr();
+ }
+
+ public void incNumVolumeDeletes() {
+ numVolumeOps.incr();
+ numVolumeDeletes.incr();
+ }
+
+ public void incNumVolumeCheckAccesses() {
+ numVolumeOps.incr();
+ numVolumeCheckAccesses.incr();
+ }
+
+ public void incNumBucketCreates() {
+ numBucketOps.incr();
+ numBucketCreates.incr();
+ }
+
+ public void incNumBucketInfos() {
+ numBucketOps.incr();
+ numBucketInfos.incr();
+ }
+
+ public void incNumBucketUpdates() {
+ numBucketOps.incr();
+ numBucketUpdates.incr();
+ }
+
+ public void incNumBucketDeletes() {
+ numBucketOps.incr();
+ numBucketDeletes.incr();
+ }
+
+ public void incNumBucketLists() {
+ numBucketOps.incr();
+ numBucketLists.incr();
+ }
+
+ public void incNumKeyLists() {
+ numKeyOps.incr();
+ numKeyLists.incr();
+ }
+
+ public void incNumVolumeLists() {
+ numVolumeOps.incr();
+ numVolumeLists.incr();
+ }
+
+ public void incNumGetServiceLists() {
+ numGetServiceLists.incr();
+ }
+
+ public void incNumVolumeCreateFails() {
+ numVolumeCreateFails.incr();
+ }
+
+ public void incNumVolumeUpdateFails() {
+ numVolumeUpdateFails.incr();
+ }
+
+ public void incNumVolumeInfoFails() {
+ numVolumeInfoFails.incr();
+ }
+
+ public void incNumVolumeDeleteFails() {
+ numVolumeDeleteFails.incr();
+ }
+
+ public void incNumVolumeCheckAccessFails() {
+ numVolumeCheckAccessFails.incr();
+ }
+
+ public void incNumBucketCreateFails() {
+ numBucketCreateFails.incr();
+ }
+
+ public void incNumBucketInfoFails() {
+ numBucketInfoFails.incr();
+ }
+
+ public void incNumBucketUpdateFails() {
+ numBucketUpdateFails.incr();
+ }
+
+ public void incNumBucketDeleteFails() {
+ numBucketDeleteFails.incr();
+ }
+
+ public void incNumKeyAllocates() {
+ numKeyOps.incr();
+ numKeyAllocate.incr();
+ }
+
+ public void incNumKeyAllocateFails() {
+ numKeyAllocateFails.incr();
+ }
+
+ public void incNumKeyLookups() {
+ numKeyOps.incr();
+ numKeyLookup.incr();
+ }
+
+ public void incNumKeyLookupFails() {
+ numKeyLookupFails.incr();
+ }
+
+ public void incNumKeyRenames() {
+ numKeyOps.incr();
+ numKeyRenames.incr();
+ }
+
+ public void incNumKeyRenameFails() {
+ numKeyOps.incr();
+ numKeyRenameFails.incr();
+ }
+
+ public void incNumKeyDeleteFails() {
+ numKeyDeleteFails.incr();
+ }
+
+ public void incNumKeyDeletes() {
+ numKeyOps.incr();
+ numKeyDeletes.incr();
+ }
+
+ public void incNumKeyCommits() {
+ numKeyOps.incr();
+ numKeyCommits.incr();
+ }
+
+ public void incNumKeyCommitFails() {
+ numKeyCommitFails.incr();
+ }
+
+ public void incNumBlockAllocateCalls() {
+ numAllocateBlockCalls.incr();
+ }
+
+ public void incNumBlockAllocateCallFails() {
+ numBlockAllocateCallFails.incr();
+ }
+
+ public void incNumBucketListFails() {
+ numBucketListFails.incr();
+ }
+
+ public void incNumKeyListFails() {
+ numKeyListFails.incr();
+ }
+
+ public void incNumVolumeListFails() {
+ numVolumeListFails.incr();
+ }
+
+ public void incNumGetServiceListFails() {
+ numGetServiceListFails.incr();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeCreates() {
+ return numVolumeCreates.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeUpdates() {
+ return numVolumeUpdates.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeInfos() {
+ return numVolumeInfos.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeDeletes() {
+ return numVolumeDeletes.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeCheckAccesses() {
+ return numVolumeCheckAccesses.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketCreates() {
+ return numBucketCreates.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketInfos() {
+ return numBucketInfos.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketUpdates() {
+ return numBucketUpdates.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketDeletes() {
+ return numBucketDeletes.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketLists() {
+ return numBucketLists.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeLists() {
+ return numVolumeLists.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyLists() {
+ return numKeyLists.value();
+ }
+
+ @VisibleForTesting
+ public long getNumGetServiceLists() {
+ return numGetServiceLists.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeCreateFails() {
+ return numVolumeCreateFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeUpdateFails() {
+ return numVolumeUpdateFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeInfoFails() {
+ return numVolumeInfoFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeDeleteFails() {
+ return numVolumeDeleteFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeCheckAccessFails() {
+ return numVolumeCheckAccessFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketCreateFails() {
+ return numBucketCreateFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketInfoFails() {
+ return numBucketInfoFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketUpdateFails() {
+ return numBucketUpdateFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketDeleteFails() {
+ return numBucketDeleteFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyAllocates() {
+ return numKeyAllocate.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyAllocateFails() {
+ return numKeyAllocateFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyLookups() {
+ return numKeyLookup.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyLookupFails() {
+ return numKeyLookupFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyRenames() {
+ return numKeyRenames.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyRenameFails() {
+ return numKeyRenameFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyDeletes() {
+ return numKeyDeletes.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyDeletesFails() {
+ return numKeyDeleteFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBucketListFails() {
+ return numBucketListFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyListFails() {
+ return numKeyListFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumVolumeListFails() {
+ return numVolumeListFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyCommits() {
+ return numKeyCommits.value();
+ }
+
+ @VisibleForTesting
+ public long getNumKeyCommitFails() {
+ return numKeyCommitFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBlockAllocates() {
+ return numAllocateBlockCalls.value();
+ }
+
+ @VisibleForTesting
+ public long getNumBlockAllocateFails() {
+ return numBlockAllocateCallFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumGetServiceListFails() {
+ return numGetServiceListFails.value();
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
new file mode 100644
index 0000000..3820aed
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+
+/**
+ * OMStorage is responsible for management of the StorageDirectories used by
+ * the Ozone Manager.
+ */
+public class OMStorage extends Storage {
+
+ public static final String STORAGE_DIR = "om";
+ public static final String OM_ID = "omUuid";
+
+ /**
+ * Construct OMStorage.
+ * @throws IOException if any directories are inaccessible.
+ */
+ public OMStorage(OzoneConfiguration conf) throws IOException {
+ super(NodeType.OM, getOzoneMetaDirPath(conf), STORAGE_DIR);
+ }
+
+ public void setScmId(String scmId) throws IOException {
+ if (getState() == StorageState.INITIALIZED) {
+ throw new IOException("OM is already initialized.");
+ } else {
+ getStorageInfo().setProperty(SCM_ID, scmId);
+ }
+ }
+
+ public void setOmId(String omId) throws IOException {
+ if (getState() == StorageState.INITIALIZED) {
+ throw new IOException("OM is already initialized.");
+ } else {
+ getStorageInfo().setProperty(OM_ID, omId);
+ }
+ }
+
+ /**
+ * Retrieves the SCM ID from the version file.
+ * @return SCM_ID
+ */
+ public String getScmId() {
+ return getStorageInfo().getProperty(SCM_ID);
+ }
+
+ /**
+ * Retrieves the OM ID from the version file.
+ * @return OM_ID
+ */
+ public String getOmId() {
+ return getStorageInfo().getProperty(OM_ID);
+ }
+
+ @Override
+ protected Properties getNodeProperties() {
+ String omId = getOmId();
+ if (omId == null) {
+ omId = UUID.randomUUID().toString();
+ }
+ Properties omProperties = new Properties();
+ omProperties.setProperty(OM_ID, omId);
+ return omProperties;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
new file mode 100644
index 0000000..21d2411
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+
+/**
+ * Ozone metadata manager interface.
+ */
+public class OmMetadataManagerImpl implements OMMetadataManager {
+
+ private final MetadataStore store;
+ private final ReadWriteLock lock;
+ private final long openKeyExpireThresholdMS;
+
+ public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
+ File metaDir = getOzoneMetaDirPath(conf);
+ final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
+ OZONE_OM_DB_CACHE_SIZE_DEFAULT);
+ File omDBFile = new File(metaDir.getPath(), OM_DB_NAME);
+ this.store = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(omDBFile)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
+ this.lock = new ReentrantReadWriteLock();
+ this.openKeyExpireThresholdMS = 1000 * conf.getInt(
+ OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
+ OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
+ }
+
+ /**
+ * Start metadata manager.
+ */
+ @Override
+ public void start() {
+
+ }
+
+ /**
+ * Stop metadata manager.
+ */
+ @Override
+ public void stop() throws IOException {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ /**
+ * Get metadata store.
+ * @return store - metadata store.
+ */
+ @VisibleForTesting
+ @Override
+ public MetadataStore getStore() {
+ return store;
+ }
+
+ /**
+ * Given a volume return the corresponding DB key.
+ * @param volume - Volume name
+ */
+ public byte[] getVolumeKey(String volume) {
+ String dbVolumeName = OzoneConsts.OM_VOLUME_PREFIX + volume;
+ return DFSUtil.string2Bytes(dbVolumeName);
+ }
+
+ /**
+ * Given a user return the corresponding DB key.
+ * @param user - User name
+ */
+ public byte[] getUserKey(String user) {
+ String dbUserName = OzoneConsts.OM_USER_PREFIX + user;
+ return DFSUtil.string2Bytes(dbUserName);
+ }
+
+ /**
+ * Given a volume and bucket, return the corresponding DB key.
+ * @param volume - User name
+ * @param bucket - Bucket name
+ */
+ public byte[] getBucketKey(String volume, String bucket) {
+ String bucketKeyString = OzoneConsts.OM_VOLUME_PREFIX + volume
+ + OzoneConsts.OM_BUCKET_PREFIX + bucket;
+ return DFSUtil.string2Bytes(bucketKeyString);
+ }
+
+ /**
+ * @param volume
+ * @param bucket
+ * @return
+ */
+ private String getBucketWithDBPrefix(String volume, String bucket) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(OzoneConsts.OM_VOLUME_PREFIX)
+ .append(volume)
+ .append(OzoneConsts.OM_BUCKET_PREFIX);
+ if (!Strings.isNullOrEmpty(bucket)) {
+ sb.append(bucket);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String getKeyWithDBPrefix(String volume, String bucket, String key) {
+ String keyVB = OzoneConsts.OM_KEY_PREFIX + volume
+ + OzoneConsts.OM_KEY_PREFIX + bucket
+ + OzoneConsts.OM_KEY_PREFIX;
+ return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
+ }
+
+ @Override
+ public byte[] getDBKeyBytes(String volume, String bucket, String key) {
+ return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key));
+ }
+
+ @Override
+ public byte[] getDeletedKeyName(byte[] keyName) {
+ return DFSUtil.string2Bytes(
+ DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
+ }
+
+ @Override
+ public byte[] getOpenKeyNameBytes(String keyName, int id) {
+ return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
+ OPEN_KEY_ID_DELIMINATOR + keyName);
+ }
+
+ /**
+ * Returns the read lock used on Metadata DB.
+ * @return readLock
+ */
+ @Override
+ public Lock readLock() {
+ return lock.readLock();
+ }
+
+ /**
+ * Returns the write lock used on Metadata DB.
+ * @return writeLock
+ */
+ @Override
+ public Lock writeLock() {
+ return lock.writeLock();
+ }
+
+ /**
+ * Returns the value associated with this key.
+ * @param key - key
+ * @return value
+ */
+ @Override
+ public byte[] get(byte[] key) throws IOException {
+ return store.get(key);
+ }
+
+ /**
+ * Puts a Key into Metadata DB.
+ * @param key - key
+ * @param value - value
+ */
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ store.put(key, value);
+ }
+
+ /**
+ * Deletes a Key from Metadata DB.
+ * @param key - key
+ */
+ public void delete(byte[] key) throws IOException {
+ store.delete(key);
+ }
+
+ @Override
+ public void writeBatch(BatchOperation batch) throws IOException {
+ this.store.writeBatch(batch);
+ }
+
+ /**
+ * Given a volume, check if it is empty, i.e there are no buckets inside it.
+ * @param volume - Volume name
+ * @return true if the volume is empty
+ */
+ public boolean isVolumeEmpty(String volume) throws IOException {
+ String dbVolumeRootName = OzoneConsts.OM_VOLUME_PREFIX + volume
+ + OzoneConsts.OM_BUCKET_PREFIX;
+ byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
+ ImmutablePair<byte[], byte[]> volumeRoot =
+ store.peekAround(0, dbVolumeRootKey);
+ if (volumeRoot != null) {
+ return !DFSUtil.bytes2String(volumeRoot.getKey())
+ .startsWith(dbVolumeRootName);
+ }
+ return true;
+ }
+
+ /**
+ * Given a volume/bucket, check if it is empty,
+ * i.e there are no keys inside it.
+ * @param volume - Volume name
+ * @param bucket - Bucket name
+ * @return true if the bucket is empty
+ */
+ public boolean isBucketEmpty(String volume, String bucket)
+ throws IOException {
+ String keyRootName = getKeyWithDBPrefix(volume, bucket, null);
+ byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+ ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
+ if (firstKey != null) {
+ return !DFSUtil.bytes2String(firstKey.getKey())
+ .startsWith(keyRootName);
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<OmBucketInfo> listBuckets(final String volumeName,
+ final String startBucket, final String bucketPrefix,
+ final int maxNumOfBuckets) throws IOException {
+ List<OmBucketInfo> result = new ArrayList<>();
+ if (Strings.isNullOrEmpty(volumeName)) {
+ throw new OMException("Volume name is required.",
+ ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+
+ byte[] volumeNameBytes = getVolumeKey(volumeName);
+ if (store.get(volumeNameBytes) == null) {
+ throw new OMException("Volume " + volumeName + " not found.",
+ ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+
+
+ // A bucket starts with /#volume/#bucket_prefix
+ MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
+ if (currentKey != null) {
+ String bucketNamePrefix =
+ getBucketWithDBPrefix(volumeName, bucketPrefix);
+ String bucket = DFSUtil.bytes2String(currentKey);
+ return bucket.startsWith(bucketNamePrefix);
+ }
+ return false;
+ };
+
+ List<Map.Entry<byte[], byte[]>> rangeResult;
+ if (!Strings.isNullOrEmpty(startBucket)) {
+ // Since we are excluding start key from the result,
+ // the maxNumOfBuckets is incremented.
+ rangeResult = store.getSequentialRangeKVs(
+ getBucketKey(volumeName, startBucket),
+ maxNumOfBuckets + 1, filter);
+ if (!rangeResult.isEmpty()) {
+ //Remove start key from result.
+ rangeResult.remove(0);
+ }
+ } else {
+ rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
+ }
+
+ for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+ OmBucketInfo info = OmBucketInfo.getFromProtobuf(
+ BucketInfo.parseFrom(entry.getValue()));
+ result.add(info);
+ }
+ return result;
+ }
+
+ @Override
+ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+ String startKey, String keyPrefix, int maxKeys) throws IOException {
+ List<OmKeyInfo> result = new ArrayList<>();
+ if (Strings.isNullOrEmpty(volumeName)) {
+ throw new OMException("Volume name is required.",
+ ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+
+ if (Strings.isNullOrEmpty(bucketName)) {
+ throw new OMException("Bucket name is required.",
+ ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+
+ byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
+ if (store.get(bucketNameBytes) == null) {
+ throw new OMException("Bucket " + bucketName + " not found.",
+ ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+
+ MetadataKeyFilter filter = new KeyPrefixFilter()
+ .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
+
+ List<Map.Entry<byte[], byte[]>> rangeResult;
+ if (!Strings.isNullOrEmpty(startKey)) {
+ //Since we are excluding start key from the result,
+ // the maxNumOfBuckets is incremented.
+ rangeResult = store.getSequentialRangeKVs(
+ getDBKeyBytes(volumeName, bucketName, startKey),
+ maxKeys + 1, filter);
+ if (!rangeResult.isEmpty()) {
+ //Remove start key from result.
+ rangeResult.remove(0);
+ }
+ } else {
+ rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
+ }
+
+ for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+ OmKeyInfo info = OmKeyInfo.getFromProtobuf(
+ KeyInfo.parseFrom(entry.getValue()));
+ result.add(info);
+ }
+ return result;
+ }
+
+ @Override
+ public List<OmVolumeArgs> listVolumes(String userName,
+ String prefix, String startKey, int maxKeys) throws IOException {
+ List<OmVolumeArgs> result = Lists.newArrayList();
+ VolumeList volumes;
+ if (Strings.isNullOrEmpty(userName)) {
+ volumes = getAllVolumes();
+ } else {
+ volumes = getVolumesByUser(userName);
+ }
+
+ if (volumes == null || volumes.getVolumeNamesCount() == 0) {
+ return result;
+ }
+
+ boolean startKeyFound = Strings.isNullOrEmpty(startKey);
+ for (String volumeName : volumes.getVolumeNamesList()) {
+ if (!Strings.isNullOrEmpty(prefix)) {
+ if (!volumeName.startsWith(prefix)) {
+ continue;
+ }
+ }
+
+ if (!startKeyFound && volumeName.equals(startKey)) {
+ startKeyFound = true;
+ continue;
+ }
+ if (startKeyFound && result.size() < maxKeys) {
+ byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
+ if (volumeInfo == null) {
+ // Could not get volume info by given volume name,
+ // since the volume name is loaded from db,
+ // this probably means om db is corrupted or some entries are
+ // accidentally removed.
+ throw new OMException("Volume info not found for " + volumeName,
+ ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
+ OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(info);
+ result.add(volumeArgs);
+ }
+ }
+
+ return result;
+ }
+
+ private VolumeList getVolumesByUser(String userName)
+ throws OMException {
+ return getVolumesByUser(getUserKey(userName));
+ }
+
+ private VolumeList getVolumesByUser(byte[] userNameKey)
+ throws OMException {
+ VolumeList volumes = null;
+ try {
+ byte[] volumesInBytes = store.get(userNameKey);
+ if (volumesInBytes == null) {
+ // No volume found for this user, return an empty list
+ return VolumeList.newBuilder().build();
+ }
+ volumes = VolumeList.parseFrom(volumesInBytes);
+ } catch (IOException e) {
+ throw new OMException("Unable to get volumes info by the given user, "
+ + "metadata might be corrupted", e,
+ ResultCodes.FAILED_METADATA_ERROR);
+ }
+ return volumes;
+ }
+
+ private VolumeList getAllVolumes() throws IOException {
+ // Scan all users in database
+ KeyPrefixFilter filter =
+ new KeyPrefixFilter().addFilter(OzoneConsts.OM_USER_PREFIX);
+ // We are not expecting a huge number of users per cluster,
+ // it should be fine to scan all users in db and return us a
+ // list of volume names in string per user.
+ List<Map.Entry<byte[], byte[]>> rangeKVs = store
+ .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter);
+
+ VolumeList.Builder builder = VolumeList.newBuilder();
+ for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
+ VolumeList volumes = this.getVolumesByUser(entry.getKey());
+ builder.addAllVolumeNames(volumes.getVolumeNamesList());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public List<BlockGroup> getPendingDeletionKeys(final int count)
+ throws IOException {
+ List<BlockGroup> keyBlocksList = Lists.newArrayList();
+ List<Map.Entry<byte[], byte[]>> rangeResult =
+ store.getRangeKVs(null, count,
+ MetadataKeyFilters.getDeletingKeyFilter());
+ for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+ OmKeyInfo info =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+ // Get block keys as a list.
+ OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
+ if (latest == null) {
+ return Collections.emptyList();
+ }
+ List<BlockID> item = latest.getLocationList().stream()
+ .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+ .collect(Collectors.toList());
+ BlockGroup keyBlocks = BlockGroup.newBuilder()
+ .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+ .addAllBlockIDs(item)
+ .build();
+ keyBlocksList.add(keyBlocks);
+ }
+ return keyBlocksList;
+ }
+
+ @Override
+ public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+ List<BlockGroup> keyBlocksList = Lists.newArrayList();
+ long now = Time.now();
+ final MetadataKeyFilter openKeyFilter =
+ new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX);
+ List<Map.Entry<byte[], byte[]>> rangeResult =
+ store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
+ openKeyFilter);
+ for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+ OmKeyInfo info =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+ long lastModify = info.getModificationTime();
+ if (now - lastModify < this.openKeyExpireThresholdMS) {
+ // consider as may still be active, not hanging.
+ continue;
+ }
+ // Get block keys as a list.
+ List<BlockID> item = info.getLatestVersionLocations()
+ .getBlocksLatestVersionOnly().stream()
+ .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+ .collect(Collectors.toList());
+ BlockGroup keyBlocks = BlockGroup.newBuilder()
+ .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+ .addAllBlockIDs(item)
+ .build();
+ keyBlocksList.add(keyBlocks);
+ }
+ return keyBlocksList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
new file mode 100644
index 0000000..8d94f5a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is the background service to delete hanging open keys.
+ * Scan the metadata of om periodically to get
+ * the keys with prefix "#open#" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class OpenKeyCleanupService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OpenKeyCleanupService.class);
+
+ private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
+
+ private final KeyManager keyManager;
+ private final ScmBlockLocationProtocol scmClient;
+
+ public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
+ KeyManager keyManager, int serviceInterval,
+ long serviceTimeout) {
+ super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
+ OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+ this.keyManager = keyManager;
+ this.scmClient = scmClient;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new OpenKeyDeletingTask());
+ return queue;
+ }
+
+ private class OpenKeyDeletingTask
+ implements BackgroundTask<BackgroundTaskResult> {
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ try {
+ List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
+ if (keyBlocksList.size() > 0) {
+ int toDeleteSize = keyBlocksList.size();
+ LOG.debug("Found {} to-delete open keys in OM", toDeleteSize);
+ List<DeleteBlockGroupResult> results =
+ scmClient.deleteKeyBlocks(keyBlocksList);
+ int deletedSize = 0;
+ for (DeleteBlockGroupResult result : results) {
+ if (result.isSuccess()) {
+ try {
+ keyManager.deleteExpiredOpenKey(result.getObjectKey());
+ LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+ deletedSize += 1;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete hanging-open key {}",
+ result.getObjectKey(), e);
+ }
+ } else {
+ LOG.warn("Deleting open Key {} failed because some of the blocks"
+ + " were failed to delete, failed blocks: {}",
+ result.getObjectKey(),
+ StringUtils.join(",", result.getFailedBlocks()));
+ }
+ }
+ LOG.info("Found {} expired open key entries, successfully " +
+ "cleaned up {} entries", toDeleteSize, deletedSize);
+ return results::size;
+ } else {
+ LOG.debug("No hanging open key found in OM");
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to get hanging open keys, retry in"
+ + " next interval", e);
+ }
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
new file mode 100644
index 0000000..71fa921
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -0,0 +1,911 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .ServicePort;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB
+ .ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.protocolPB
+ .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
+import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
+import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
+import static org.apache.hadoop.hdds.server.ServerUtils
+ .updateRPCListenAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OzoneManagerService
+ .newReflectiveBlockingService;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+ .NodeState.HEALTHY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+/**
+ * Ozone Manager is the metadata manager of ozone.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
+public final class OzoneManager extends ServiceRuntimeInfoImpl
+ implements OzoneManagerProtocol, OMMXBean {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneManager.class);
+
+ private static final String USAGE =
+ "Usage: \n ozone om [genericOptions] " + "[ "
+ + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone om [ "
+ + StartupOption.HELP.getName() + " ]\n";
+
+ /** Startup options. */
+ public enum StartupOption {
+ CREATEOBJECTSTORE("-createObjectStore"),
+ HELP("-help"),
+ REGULAR("-regular");
+
+ private final String name;
+
+ StartupOption(String arg) {
+ this.name = arg;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static StartupOption parse(String value) {
+ for (StartupOption option : StartupOption.values()) {
+ if (option.name.equalsIgnoreCase(value)) {
+ return option;
+ }
+ }
+ return null;
+ }
+ }
+
+ private final OzoneConfiguration configuration;
+ private final RPC.Server omRpcServer;
+ private final InetSocketAddress omRpcAddress;
+ private final OMMetadataManager metadataManager;
+ private final VolumeManager volumeManager;
+ private final BucketManager bucketManager;
+ private final KeyManager keyManager;
+ private final OMMetrics metrics;
+ private final OzoneManagerHttpServer httpServer;
+ private final OMStorage omStorage;
+ private final ScmBlockLocationProtocol scmBlockClient;
+ private final StorageContainerLocationProtocol scmContainerClient;
+ private ObjectName omInfoBeanName;
+
+ private OzoneManager(OzoneConfiguration conf) throws IOException {
+ Preconditions.checkNotNull(conf);
+ configuration = conf;
+ omStorage = new OMStorage(conf);
+ scmBlockClient = getScmBlockClient(configuration);
+ scmContainerClient = getScmContainerClient(configuration);
+ if (omStorage.getState() != StorageState.INITIALIZED) {
+ throw new OMException("OM not initialized.",
+ ResultCodes.OM_NOT_INITIALIZED);
+ }
+
+ // verifies that the SCM info in the OM Version file is correct.
+ ScmInfo scmInfo = scmBlockClient.getScmInfo();
+ if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
+ .getScmId().equals(omStorage.getScmId()))) {
+ throw new OMException("SCM version info mismatch.",
+ ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+ }
+ final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+ OZONE_OM_HANDLER_COUNT_DEFAULT);
+
+ RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ BlockingService omService = newReflectiveBlockingService(
+ new OzoneManagerProtocolServerSideTranslatorPB(this));
+ final InetSocketAddress omNodeRpcAddr =
+ getOmAddress(configuration);
+ omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
+ OzoneManagerProtocolPB.class, omService,
+ handlerCount);
+ omRpcAddress = updateRPCListenAddress(configuration,
+ OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+ metadataManager = new OmMetadataManagerImpl(configuration);
+ volumeManager = new VolumeManagerImpl(metadataManager, configuration);
+ bucketManager = new BucketManagerImpl(metadataManager);
+ metrics = OMMetrics.create();
+ keyManager =
+ new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
+ omStorage.getOmId());
+ httpServer = new OzoneManagerHttpServer(configuration, this);
+ }
+
+ /**
+ * Create a scm block client, used by putKey() and getKey().
+ *
+ * @return {@link ScmBlockLocationProtocol}
+ * @throws IOException
+ */
+ private static ScmBlockLocationProtocol getScmBlockClient(
+ OzoneConfiguration conf) throws IOException {
+ RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long scmVersion =
+ RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+ InetSocketAddress scmBlockAddress =
+ getScmAddressForBlockClients(conf);
+ ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+ new ScmBlockLocationProtocolClientSideTranslatorPB(
+ RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+ scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)));
+ return scmBlockLocationClient;
+ }
+
+ /**
+ * Returns a scm container client.
+ *
+ * @return {@link StorageContainerLocationProtocol}
+ * @throws IOException
+ */
+ private static StorageContainerLocationProtocol getScmContainerClient(
+ OzoneConfiguration conf) throws IOException {
+ RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long scmVersion =
+ RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+ InetSocketAddress scmAddr = getScmAddressForClients(
+ conf);
+ StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient =
+ new StorageContainerLocationProtocolClientSideTranslatorPB(
+ RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
+ scmAddr, UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf)));
+ return scmContainerClient;
+ }
+
+ @VisibleForTesting
+ public KeyManager getKeyManager() {
+ return keyManager;
+ }
+
+ @VisibleForTesting
+ public ScmInfo getScmInfo() throws IOException {
+ return scmBlockClient.getScmInfo();
+ }
+
+ @VisibleForTesting
+ public OMStorage getOmStorage() {
+ return omStorage;
+ }
+ /**
+ * Starts an RPC server, if configured.
+ *
+ * @param conf configuration
+ * @param addr configured address of RPC server
+ * @param protocol RPC protocol provided by RPC server
+ * @param instance RPC protocol implementation instance
+ * @param handlerCount RPC server handler count
+ *
+ * @return RPC server
+ * @throws IOException if there is an I/O error while creating RPC server
+ */
+ private static RPC.Server startRpcServer(OzoneConfiguration conf,
+ InetSocketAddress addr, Class<?> protocol, BlockingService instance,
+ int handlerCount) throws IOException {
+ RPC.Server rpcServer = new RPC.Builder(conf)
+ .setProtocol(protocol)
+ .setInstance(instance)
+ .setBindAddress(addr.getHostString())
+ .setPort(addr.getPort())
+ .setNumHandlers(handlerCount)
+ .setVerbose(false)
+ .setSecretManager(null)
+ .build();
+
+ DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+ return rpcServer;
+ }
+
+ /**
+ * Get metadata manager.
+ * @return metadata manager.
+ */
+ public OMMetadataManager getMetadataManager() {
+ return metadataManager;
+ }
+
+ public OMMetrics getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * Main entry point for starting OzoneManager.
+ *
+ * @param argv arguments
+ * @throws IOException if startup fails due to I/O error
+ */
+ public static void main(String[] argv) throws IOException {
+ if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
+ System.exit(0);
+ }
+ try {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+ if (!hParser.isParseSuccessful()) {
+ System.err.println("USAGE: " + USAGE + " \n");
+ hParser.printGenericCommandUsage(System.err);
+ System.exit(1);
+ }
+ StringUtils.startupShutdownMessage(OzoneManager.class, argv, LOG);
+ OzoneManager om = createOm(hParser.getRemainingArgs(), conf);
+ if (om != null) {
+ om.start();
+ om.join();
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to start the OzoneManager.", t);
+ terminate(1, t);
+ }
+ }
+
+ private static void printUsage(PrintStream out) {
+ out.println(USAGE + "\n");
+ }
+
+ /**
+ * Constructs OM instance based on command line arguments.
+ * @param argv Command line arguments
+ * @param conf OzoneConfiguration
+ * @return OM instance
+ * @throws IOException in case OM instance creation fails.
+ */
+
+ public static OzoneManager createOm(String[] argv,
+ OzoneConfiguration conf) throws IOException {
+ if (!isHddsEnabled(conf)) {
+ System.err.println("OM cannot be started in secure mode or when " +
+ OZONE_ENABLED + " is set to false");
+ System.exit(1);
+ }
+ StartupOption startOpt = parseArguments(argv);
+ if (startOpt == null) {
+ printUsage(System.err);
+ terminate(1);
+ return null;
+ }
+ switch (startOpt) {
+ case CREATEOBJECTSTORE:
+ terminate(omInit(conf) ? 0 : 1);
+ return null;
+ case HELP:
+ printUsage(System.err);
+ terminate(0);
+ return null;
+ default:
+ return new OzoneManager(conf);
+ }
+ }
+
+ /**
+ * Initializes the OM instance.
+ * @param conf OzoneConfiguration
+ * @return true if OM initialization succeeds, false otherwise
+ * @throws IOException in case ozone metadata directory path is not accessible
+ */
+
+ private static boolean omInit(OzoneConfiguration conf) throws IOException {
+ OMStorage omStorage = new OMStorage(conf);
+ StorageState state = omStorage.getState();
+ if (state != StorageState.INITIALIZED) {
+ try {
+ ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
+ ScmInfo scmInfo = scmBlockClient.getScmInfo();
+ String clusterId = scmInfo.getClusterId();
+ String scmId = scmInfo.getScmId();
+ if (clusterId == null || clusterId.isEmpty()) {
+ throw new IOException("Invalid Cluster ID");
+ }
+ if (scmId == null || scmId.isEmpty()) {
+ throw new IOException("Invalid SCM ID");
+ }
+ omStorage.setClusterId(clusterId);
+ omStorage.setScmId(scmId);
+ omStorage.initialize();
+ System.out.println(
+ "OM initialization succeeded.Current cluster id for sd="
+ + omStorage.getStorageDir() + ";cid=" + omStorage
+ .getClusterID());
+ return true;
+ } catch (IOException ioe) {
+ LOG.error("Could not initialize OM version file", ioe);
+ return false;
+ }
+ } else {
+ System.out.println(
+ "OM already initialized.Reusing existing cluster id for sd="
+ + omStorage.getStorageDir() + ";cid=" + omStorage
+ .getClusterID());
+ return true;
+ }
+ }
+
+ /**
+ * Parses the command line options for OM initialization.
+ * @param args command line arguments
+ * @return StartupOption if options are valid, null otherwise
+ */
+ private static StartupOption parseArguments(String[] args) {
+ if (args == null || args.length == 0) {
+ return StartupOption.REGULAR;
+ } else if (args.length == 1) {
+ return StartupOption.parse(args[0]);
+ }
+ return null;
+ }
+
+ /**
+ * Builds a message for logging startup information about an RPC server.
+ *
+ * @param description RPC server description
+ * @param addr RPC server listening address
+ * @return server startup message
+ */
+ private static String buildRpcServerStartMessage(String description,
+ InetSocketAddress addr) {
+ return addr != null ? String.format("%s is listening at %s",
+ description, addr.toString()) :
+ String.format("%s not started", description);
+ }
+
+ /**
+ * Start service.
+ */
+ public void start() throws IOException {
+ LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
+ omRpcAddress));
+ DefaultMetricsSystem.initialize("OzoneManager");
+ metadataManager.start();
+ keyManager.start();
+ omRpcServer.start();
+ httpServer.start();
+ registerMXBean();
+ setStartTime();
+ }
+
+ /**
+ * Stop service.
+ */
+ public void stop() {
+ try {
+ metadataManager.stop();
+ omRpcServer.stop();
+ keyManager.stop();
+ httpServer.stop();
+ metrics.unRegister();
+ unregisterMXBean();
+ } catch (Exception e) {
+ LOG.error("OzoneManager stop failed.", e);
+ }
+ }
+
+ /**
+ * Wait until service has completed shutdown.
+ */
+ public void join() {
+ try {
+ omRpcServer.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Interrupted during OzoneManager join.", e);
+ }
+ }
+
+ /**
+ * Creates a volume.
+ *
+ * @param args - Arguments to create Volume.
+ * @throws IOException
+ */
+ @Override
+ public void createVolume(OmVolumeArgs args) throws IOException {
+ try {
+ metrics.incNumVolumeCreates();
+ volumeManager.createVolume(args);
+ } catch (Exception ex) {
+ metrics.incNumVolumeCreateFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Changes the owner of a volume.
+ *
+ * @param volume - Name of the volume.
+ * @param owner - Name of the owner.
+ * @throws IOException
+ */
+ @Override
+ public void setOwner(String volume, String owner) throws IOException {
+ try {
+ metrics.incNumVolumeUpdates();
+ volumeManager.setOwner(volume, owner);
+ } catch (Exception ex) {
+ metrics.incNumVolumeUpdateFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Changes the Quota on a volume.
+ *
+ * @param volume - Name of the volume.
+ * @param quota - Quota in bytes.
+ * @throws IOException
+ */
+ @Override
+ public void setQuota(String volume, long quota) throws IOException {
+ try {
+ metrics.incNumVolumeUpdates();
+ volumeManager.setQuota(volume, quota);
+ } catch (Exception ex) {
+ metrics.incNumVolumeUpdateFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Checks if the specified user can access this volume.
+ *
+ * @param volume - volume
+ * @param userAcl - user acls which needs to be checked for access
+ * @return true if the user has required access for the volume,
+ * false otherwise
+ * @throws IOException
+ */
+ @Override
+ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+ throws IOException {
+ try {
+ metrics.incNumVolumeCheckAccesses();
+ return volumeManager.checkVolumeAccess(volume, userAcl);
+ } catch (Exception ex) {
+ metrics.incNumVolumeCheckAccessFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Gets the volume information.
+ *
+ * @param volume - Volume name.
+ * @return VolumeArgs or exception is thrown.
+ * @throws IOException
+ */
+ @Override
+ public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
+ try {
+ metrics.incNumVolumeInfos();
+ return volumeManager.getVolumeInfo(volume);
+ } catch (Exception ex) {
+ metrics.incNumVolumeInfoFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Deletes an existing empty volume.
+ *
+ * @param volume - Name of the volume.
+ * @throws IOException
+ */
+ @Override
+ public void deleteVolume(String volume) throws IOException {
+ try {
+ metrics.incNumVolumeDeletes();
+ volumeManager.deleteVolume(volume);
+ } catch (Exception ex) {
+ metrics.incNumVolumeDeleteFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Lists volume owned by a specific user.
+ *
+ * @param userName - user name
+ * @param prefix - Filter prefix -- Return only entries that match this.
+ * @param prevKey - Previous key -- List starts from the next from the
+ * prevkey
+ * @param maxKeys - Max number of keys to return.
+ * @return List of Volumes.
+ * @throws IOException
+ */
+ @Override
+ public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
+ String prevKey, int maxKeys) throws IOException {
+ try {
+ metrics.incNumVolumeLists();
+ return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+ } catch (Exception ex) {
+ metrics.incNumVolumeListFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Lists volume all volumes in the cluster.
+ *
+ * @param prefix - Filter prefix -- Return only entries that match this.
+ * @param prevKey - Previous key -- List starts from the next from the
+ * prevkey
+ * @param maxKeys - Max number of keys to return.
+ * @return List of Volumes.
+ * @throws IOException
+ */
+ @Override
+ public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
+ maxKeys) throws IOException {
+ try {
+ metrics.incNumVolumeLists();
+ return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
+ } catch (Exception ex) {
+ metrics.incNumVolumeListFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Creates a bucket.
+ *
+ * @param bucketInfo - BucketInfo to create bucket.
+ * @throws IOException
+ */
+ @Override
+ public void createBucket(OmBucketInfo bucketInfo) throws IOException {
+ try {
+ metrics.incNumBucketCreates();
+ bucketManager.createBucket(bucketInfo);
+ } catch (Exception ex) {
+ metrics.incNumBucketCreateFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<OmBucketInfo> listBuckets(String volumeName,
+ String startKey, String prefix, int maxNumOfBuckets)
+ throws IOException {
+ try {
+ metrics.incNumBucketLists();
+ return bucketManager.listBuckets(volumeName,
+ startKey, prefix, maxNumOfBuckets);
+ } catch (IOException ex) {
+ metrics.incNumBucketListFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Gets the bucket information.
+ *
+ * @param volume - Volume name.
+ * @param bucket - Bucket name.
+ * @return OmBucketInfo or exception is thrown.
+ * @throws IOException
+ */
+ @Override
+ public OmBucketInfo getBucketInfo(String volume, String bucket)
+ throws IOException {
+ try {
+ metrics.incNumBucketInfos();
+ return bucketManager.getBucketInfo(volume, bucket);
+ } catch (Exception ex) {
+ metrics.incNumBucketInfoFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Allocate a key.
+ *
+ * @param args - attributes of the key.
+ * @return OmKeyInfo - the info about the allocated key.
+ * @throws IOException
+ */
+ @Override
+ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+ try {
+ metrics.incNumKeyAllocates();
+ return keyManager.openKey(args);
+ } catch (Exception ex) {
+ metrics.incNumKeyAllocateFails();
+ throw ex;
+ }
+ }
+
+ @Override
+ public void commitKey(OmKeyArgs args, int clientID)
+ throws IOException {
+ try {
+ metrics.incNumKeyCommits();
+ keyManager.commitKey(args, clientID);
+ } catch (Exception ex) {
+ metrics.incNumKeyCommitFails();
+ throw ex;
+ }
+ }
+
+ @Override
+ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+ throws IOException {
+ try {
+ metrics.incNumBlockAllocateCalls();
+ return keyManager.allocateBlock(args, clientID);
+ } catch (Exception ex) {
+ metrics.incNumBlockAllocateCallFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Lookup a key.
+ *
+ * @param args - attributes of the key.
+ * @return OmKeyInfo - the info about the requested key.
+ * @throws IOException
+ */
+ @Override
+ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+ try {
+ metrics.incNumKeyLookups();
+ return keyManager.lookupKey(args);
+ } catch (Exception ex) {
+ metrics.incNumKeyLookupFails();
+ throw ex;
+ }
+ }
+
+ @Override
+ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+ try {
+ metrics.incNumKeyRenames();
+ keyManager.renameKey(args, toKeyName);
+ } catch (IOException e) {
+ metrics.incNumKeyRenameFails();
+ throw e;
+ }
+ }
+
+ /**
+ * Deletes an existing key.
+ *
+ * @param args - attributes of the key.
+ * @throws IOException
+ */
+ @Override
+ public void deleteKey(OmKeyArgs args) throws IOException {
+ try {
+ metrics.incNumKeyDeletes();
+ keyManager.deleteKey(args);
+ } catch (Exception ex) {
+ metrics.incNumKeyDeleteFails();
+ throw ex;
+ }
+ }
+
+ @Override
+ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+ String startKey, String keyPrefix, int maxKeys) throws IOException {
+ try {
+ metrics.incNumKeyLists();
+ return keyManager.listKeys(volumeName, bucketName,
+ startKey, keyPrefix, maxKeys);
+ } catch (IOException ex) {
+ metrics.incNumKeyListFails();
+ throw ex;
+ }
+ }
+
+ /**
+ * Sets bucket property from args.
+ * @param args - BucketArgs.
+ * @throws IOException
+ */
+ @Override
+ public void setBucketProperty(OmBucketArgs args)
+ throws IOException {
+ try {
+ metrics.incNumBucketUpdates();
+ bucketManager.setBucketProperty(args);
+ } catch (Exception ex) {
+ metrics.incNumBucketUpdateFails();
+ throw ex;
+ }
+ }
+
+
+ /**
+ * Deletes an existing empty bucket from volume.
+ * @param volume - Name of the volume.
+ * @param bucket - Name of the bucket.
+ * @throws IOException
+ */
+ public void deleteBucket(String volume, String bucket) throws IOException {
+ try {
+ metrics.incNumBucketDeletes();
+ bucketManager.deleteBucket(volume, bucket);
+ } catch (Exception ex) {
+ metrics.incNumBucketDeleteFails();
+ throw ex;
+ }
+ }
+
+ private void registerMXBean() {
+ Map<String, String> jmxProperties = new HashMap<String, String>();
+ jmxProperties.put("component", "ServerRuntime");
+ this.omInfoBeanName =
+ MBeans.register("OzoneManager",
+ "OzoneManagerInfo",
+ jmxProperties,
+ this);
+ }
+
+ private void unregisterMXBean() {
+ if (this.omInfoBeanName != null) {
+ MBeans.unregister(this.omInfoBeanName);
+ this.omInfoBeanName = null;
+ }
+ }
+
+ @Override
+ public String getRpcPort() {
+ return "" + omRpcAddress.getPort();
+ }
+
+ @VisibleForTesting
+ public OzoneManagerHttpServer getHttpServer() {
+ return httpServer;
+ }
+
+ @Override
+ public List<ServiceInfo> getServiceList() throws IOException {
+ // When we implement multi-home this call has to be handled properly.
+ List<ServiceInfo> services = new ArrayList<>();
+ ServiceInfo.Builder omServiceInfoBuilder = ServiceInfo.newBuilder()
+ .setNodeType(HddsProtos.NodeType.OM)
+ .setHostname(omRpcAddress.getHostName())
+ .addServicePort(ServicePort.newBuilder()
+ .setType(ServicePort.Type.RPC)
+ .setValue(omRpcAddress.getPort())
+ .build());
+ if (httpServer.getHttpAddress() != null) {
+ omServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+ .setType(ServicePort.Type.HTTP)
+ .setValue(httpServer.getHttpAddress().getPort())
+ .build());
+ }
+ if (httpServer.getHttpsAddress() != null) {
+ omServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+ .setType(ServicePort.Type.HTTPS)
+ .setValue(httpServer.getHttpsAddress().getPort())
+ .build());
+ }
+ services.add(omServiceInfoBuilder.build());
+
+ // For client we have to return SCM with container protocol port,
+ // not block protocol.
+ InetSocketAddress scmAddr = getScmAddressForClients(
+ configuration);
+ ServiceInfo.Builder scmServiceInfoBuilder = ServiceInfo.newBuilder()
+ .setNodeType(HddsProtos.NodeType.SCM)
+ .setHostname(scmAddr.getHostName())
+ .addServicePort(ServicePort.newBuilder()
+ .setType(ServicePort.Type.RPC)
+ .setValue(scmAddr.getPort()).build());
+ services.add(scmServiceInfoBuilder.build());
+
+ List<HddsProtos.Node> nodes = scmContainerClient.queryNode(HEALTHY,
+ HddsProtos.QueryScope.CLUSTER, "");
+
+ for (HddsProtos.Node node : nodes) {
+ HddsProtos.DatanodeDetailsProto datanode = node.getNodeID();
+
+ ServiceInfo.Builder dnServiceInfoBuilder = ServiceInfo.newBuilder()
+ .setNodeType(HddsProtos.NodeType.DATANODE)
+ .setHostname(datanode.getHostName());
+
+ dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+ .setType(ServicePort.Type.HTTP)
+ .setValue(DatanodeDetails.getFromProtoBuf(datanode)
+ .getPort(DatanodeDetails.Port.Name.REST).getValue())
+ .build());
+
+ services.add(dnServiceInfoBuilder.build());
+ }
+
+ metrics.incNumGetServiceLists();
+ // For now there is no exception that can can happen in this call,
+ // so failure metrics is not handled. In future if there is any need to
+ // handle exception in this method, we need to incorporate
+ // metrics.incNumGetServiceListFails()
+ return services;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
new file mode 100644
index 0000000..bd6ab69
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.server.BaseHttpServer;
+
+import java.io.IOException;
+
+/**
+ * HttpServer wrapper for the OzoneManager.
+ */
+public class OzoneManagerHttpServer extends BaseHttpServer {
+
+ public OzoneManagerHttpServer(Configuration conf, OzoneManager om)
+ throws IOException {
+ super(conf, "ozoneManager");
+ addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
+ getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om);
+ }
+
+ @Override protected String getHttpAddressKey() {
+ return OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
+ }
+
+ @Override protected String getHttpBindHostKey() {
+ return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY;
+ }
+
+ @Override protected String getHttpsAddressKey() {
+ return OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
+ }
+
+ @Override protected String getHttpsBindHostKey() {
+ return OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY;
+ }
+
+ @Override protected String getBindHostDefault() {
+ return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_DEFAULT;
+ }
+
+ @Override protected int getHttpBindPortDefault() {
+ return OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
+ }
+
+ @Override protected int getHttpsBindPortDefault() {
+ return OMConfigKeys.OZONE_OM_HTTPS_BIND_PORT_DEFAULT;
+ }
+
+ @Override protected String getKeytabFile() {
+ return OMConfigKeys.OZONE_OM_KEYTAB_FILE;
+ }
+
+ @Override protected String getSpnegoPrincipal() {
+ return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
+ }
+
+ @Override protected String getEnabledKey() {
+ return OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
new file mode 100644
index 0000000..47713e2
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+
+/**
+ * Provides REST access to Ozone Service List.
+ * <p>
+ * This servlet generally will be placed under the /serviceList URL of
+ * OzoneManager HttpServer.
+ *
+ * The return format is of JSON and in the form
+ * <p>
+ * <code><pre>
+ * {
+ * "services" : [
+ * {
+ * "NodeType":"OM",
+ * "Hostname" "$hostname",
+ * "ports" : {
+ * "$PortType" : "$port",
+ * ...
+ * }
+ * }
+ * ]
+ * }
+ * </pre></code>
+ * <p>
+ *
+ */
+public class ServiceListJSONServlet extends HttpServlet {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ServiceListJSONServlet.class);
+ private static final long serialVersionUID = 1L;
+
+ private transient OzoneManager om;
+
+ public void init() throws ServletException {
+ this.om = (OzoneManager) getServletContext()
+ .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
+ }
+
+ /**
+ * Process a GET request for the specified resource.
+ *
+ * @param request
+ * The servlet request we are processing
+ * @param response
+ * The servlet response we are creating
+ */
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ response.setContentType("application/json; charset=utf8");
+ PrintWriter writer = response.getWriter();
+ try {
+ writer.write(objectMapper.writeValueAsString(om.getServiceList()));
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Caught an exception while processing ServiceList request", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
new file mode 100644
index 0000000..8475dd9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OzoneAclInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * OM volume manager interface.
+ */
+public interface VolumeManager {
+
+ /**
+ * Create a new volume.
+ * @param args - Volume args to create a volume
+ */
+ void createVolume(OmVolumeArgs args) throws IOException;
+
+ /**
+ * Changes the owner of a volume.
+ *
+ * @param volume - Name of the volume.
+ * @param owner - Name of the owner.
+ * @throws IOException
+ */
+ void setOwner(String volume, String owner) throws IOException;
+
+ /**
+ * Changes the Quota on a volume.
+ *
+ * @param volume - Name of the volume.
+ * @param quota - Quota in bytes.
+ * @throws IOException
+ */
+ void setQuota(String volume, long quota) throws IOException;
+
+ /**
+ * Gets the volume information.
+ * @param volume - Volume name.
+ * @return VolumeArgs or exception is thrown.
+ * @throws IOException
+ */
+ OmVolumeArgs getVolumeInfo(String volume) throws IOException;
+
+ /**
+ * Deletes an existing empty volume.
+ *
+ * @param volume - Name of the volume.
+ * @throws IOException
+ */
+ void deleteVolume(String volume) throws IOException;
+
+ /**
+ * Checks if the specified user with a role can access this volume.
+ *
+ * @param volume - volume
+ * @param userAcl - user acl which needs to be checked for access
+ * @return true if the user has access for the volume, false otherwise
+ * @throws IOException
+ */
+ boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+ throws IOException;
+
+ /**
+ * Returns a list of volumes owned by a given user; if user is null,
+ * returns all volumes.
+ *
+ * @param userName
+ * volume owner
+ * @param prefix
+ * the volume prefix used to filter the listing result.
+ * @param startKey
+ * the start volume name determines where to start listing from,
+ * this key is excluded from the result.
+ * @param maxKeys
+ * the maximum number of volumes to return.
+ * @return a list of {@link OmVolumeArgs}
+ * @throws IOException
+ */
+ List<OmVolumeArgs> listVolumes(String userName, String prefix,
+ String startKey, int maxKeys) throws IOException;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org