You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/08/08 20:36:37 UTC

[2/8] hadoop git commit: HDFS-12259. Ozone: OzoneClient: Refactor move ozone client from hadoop-hdfs to hadoop-hdfs-client. Contributed by Nandakumar.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
deleted file mode 100644
index 59c2639..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.web.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
-import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Maintaining a list of ChunkInputStream. Write based on offset.
- *
- * Note that this may write to multiple containers in one write call. In case
- * that first container succeeded but later ones failed, the succeeded writes
- * are not rolled back.
- *
- * TODO : currently not support multi-thread access.
- */
-public class ChunkGroupOutputStream extends OutputStream {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ChunkGroupOutputStream.class);
-
-  // array list's get(index) is O(1)
-  private final ArrayList<ChunkOutputStreamEntry> streamEntries;
-  private int currentStreamIndex;
-  private long totalSize;
-  private long byteOffset;
-
-  //This has to be removed once HDFS-11888 is resolved.
-  //local cache which will have list of created container names.
-  private static Set<String> containersCreated = new HashSet<>();
-
-  public ChunkGroupOutputStream() {
-    this.streamEntries = new ArrayList<>();
-    this.currentStreamIndex = 0;
-    this.totalSize = 0;
-    this.byteOffset = 0;
-  }
-
-  @VisibleForTesting
-  public long getByteOffset() {
-    return byteOffset;
-  }
-
-  /**
-   * Append another stream to the end of the list. Note that the streams are not
-   * actually created to this point, only enough meta data about the stream is
-   * stored. When something is to be actually written to the stream, the stream
-   * will be created (if not already).
-   *
-   * @param containerKey the key to store in the container
-   * @param key the ozone key
-   * @param xceiverClientManager xceiver manager instance
-   * @param xceiverClient xceiver manager instance
-   * @param requestID the request id
-   * @param chunkSize the chunk size for this key chunks
-   * @param length the total length of this key
-   */
-  public synchronized void addStream(String containerKey, String key,
-      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
-      String requestID, int chunkSize, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
-        xceiverClientManager, xceiverClient, requestID, chunkSize, length));
-    totalSize += length;
-  }
-
-  @VisibleForTesting
-  public synchronized void addStream(OutputStream outputStream, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
-    totalSize += length;
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    if (streamEntries.size() <= currentStreamIndex) {
-      throw new IndexOutOfBoundsException();
-    }
-    ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
-    entry.write(b);
-    if (entry.getRemaining() <= 0) {
-      currentStreamIndex += 1;
-    }
-    byteOffset += 1;
-  }
-
-  /**
-   * Try to write the bytes sequence b[off:off+len) to streams.
-   *
-   * NOTE: Throws exception if the data could not fit into the remaining space.
-   * In which case nothing will be written.
-   * TODO:May need to revisit this behaviour.
-   *
-   * @param b byte data
-   * @param off starting offset
-   * @param len length to write
-   * @throws IOException
-   */
-  @Override
-  public synchronized void write(byte[] b, int off, int len)
-      throws IOException {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return;
-    }
-    if (streamEntries.size() <= currentStreamIndex) {
-      throw new IOException("Write out of stream range! stream index:" +
-          currentStreamIndex);
-    }
-    if (totalSize - byteOffset < len) {
-      throw new IOException("Can not write " + len + " bytes with only " +
-          (totalSize - byteOffset) + " byte space");
-    }
-    while (len > 0) {
-      // in theory, this condition should never violate due the check above
-      // still do a sanity check.
-      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-      ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int writeLen = Math.min(len, (int)current.getRemaining());
-      current.write(b, off, writeLen);
-      if (current.getRemaining() <= 0) {
-        currentStreamIndex += 1;
-      }
-      len -= writeLen;
-      off += writeLen;
-      byteOffset += writeLen;
-    }
-  }
-
-  @Override
-  public synchronized void flush() throws IOException {
-    for (int i = 0; i <= currentStreamIndex; i++) {
-      streamEntries.get(i).flush();
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    for (ChunkOutputStreamEntry entry : streamEntries) {
-      entry.close();
-    }
-  }
-
-  private static class ChunkOutputStreamEntry extends OutputStream {
-    private OutputStream outputStream;
-    private final String containerKey;
-    private final String key;
-    private final XceiverClientManager xceiverClientManager;
-    private final XceiverClientSpi xceiverClient;
-    private final String requestId;
-    private final int chunkSize;
-    // total number of bytes that should be written to this stream
-    private final long length;
-    // the current position of this stream 0 <= currentPosition < length
-    private long currentPosition;
-
-    ChunkOutputStreamEntry(String containerKey, String key,
-        XceiverClientManager xceiverClientManager,
-        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length) {
-      this.outputStream = null;
-      this.containerKey = containerKey;
-      this.key = key;
-      this.xceiverClientManager = xceiverClientManager;
-      this.xceiverClient = xceiverClient;
-      this.requestId = requestId;
-      this.chunkSize = chunkSize;
-
-      this.length = length;
-      this.currentPosition = 0;
-    }
-
-    /**
-     * For testing purpose, taking a some random created stream instance.
-     * @param  outputStream a existing writable output stream
-     * @param  length the length of data to write to the stream
-     */
-    ChunkOutputStreamEntry(OutputStream outputStream, long length) {
-      this.outputStream = outputStream;
-      this.containerKey = null;
-      this.key = null;
-      this.xceiverClientManager = null;
-      this.xceiverClient = null;
-      this.requestId = null;
-      this.chunkSize = -1;
-
-      this.length = length;
-      this.currentPosition = 0;
-    }
-
-    long getLength() {
-      return length;
-    }
-
-    long getRemaining() {
-      return length - currentPosition;
-    }
-
-    private synchronized void checkStream() {
-      if (this.outputStream == null) {
-        this.outputStream = new ChunkOutputStream(containerKey,
-            key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize);
-      }
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-      checkStream();
-      outputStream.write(b);
-      this.currentPosition += 1;
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-      checkStream();
-      outputStream.write(b, off, len);
-      this.currentPosition += len;
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if (this.outputStream != null) {
-        this.outputStream.flush();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (this.outputStream != null) {
-        this.outputStream.close();
-      }
-    }
-  }
-
-  public static ChunkGroupOutputStream getFromKsmKeyInfo(
-      KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocolClientSideTranslatorPB
-          storageContainerLocationClient,
-      int chunkSize, String requestId) throws IOException {
-    // TODO: the following createContainer and key writes may fail, in which
-    // case we should revert the above allocateKey to KSM.
-    // check index as sanity check
-    int index = 0;
-    String containerKey;
-    ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
-    for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
-      containerKey = subKeyInfo.getBlockID();
-
-      Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
-      String containerName = subKeyInfo.getContainerName();
-      Pipeline pipeline =
-          storageContainerLocationClient.getContainer(containerName);
-      XceiverClientSpi xceiverClient =
-          xceiverClientManager.acquireClient(pipeline);
-      // create container if needed
-      // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
-      //The following change has to reverted once HDFS-11888 is fixed.
-      if(!containersCreated.contains(containerName)) {
-        synchronized (containerName.intern()) {
-          //checking again, there is a chance that some other thread has
-          // created it.
-          if (!containersCreated.contains(containerName)) {
-            LOG.debug("Need to create container {}.", containerName);
-            try {
-              ContainerProtocolCalls.createContainer(xceiverClient, requestId);
-            } catch (StorageContainerException ex) {
-              if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
-                //container already exist.
-                LOG.debug("Container {} already exists.", containerName);
-              } else {
-                LOG.error("Container creation failed for {}.",
-                    containerName, ex);
-                throw ex;
-              }
-            }
-            containersCreated.add(containerName);
-          }
-        }
-      }
-
-      groupOutputStream.addStream(containerKey, keyInfo.getKeyName(),
-          xceiverClientManager, xceiverClient, requestId, chunkSize,
-          subKeyInfo.getLength());
-    }
-    return groupOutputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 713a085..76c9be0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -19,20 +19,21 @@
 package org.apache.hadoop.ozone.web.storage;
 
 import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset
-    .LengthInputStream;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocolPB
+import org.apache.hadoop.ozone.client.io.LengthInputStream;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-import org.apache.hadoop.ozone.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
deleted file mode 100644
index 18ade6e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.web.storage;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
-import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-import org.apache.hadoop.ozone.web.request.OzoneQuota;
-import org.apache.hadoop.ozone.web.response.BucketInfo;
-import org.apache.hadoop.ozone.web.response.KeyInfo;
-import org.apache.hadoop.ozone.web.response.VolumeInfo;
-import org.apache.hadoop.ozone.web.response.VolumeOwner;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * This class contains methods that define the translation between the Ozone
- * domain model and the storage container domain model.
- */
-final class OzoneContainerTranslation {
-
-  private static final String ACLS = "ACLS";
-  private static final String BUCKET = "BUCKET";
-  private static final String BUCKET_NAME = "BUCKET_NAME";
-  private static final String CREATED_BY = "CREATED_BY";
-  private static final String CREATED_ON = "CREATED_ON";
-  private static final String KEY = "KEY";
-  private static final String OWNER = "OWNER";
-  private static final String QUOTA = "QUOTA";
-  private static final String STORAGE_TYPE = "STORAGE_TYPE";
-  private static final String TYPE = "TYPE";
-  private static final String VERSIONING = "VERSIONING";
-  private static final String VOLUME = "VOLUME";
-  private static final String VOLUME_NAME = "VOLUME_NAME";
-
-  /**
-   * Creates key data intended for reading a container key.
-   *
-   * @param containerName container name
-   * @param containerKey container key
-   * @return KeyData intended for reading the container key
-   */
-  public static KeyData containerKeyDataForRead(String containerName,
-      String containerKey) {
-    return KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(containerKey)
-        .build();
-  }
-
-  /**
-   * Translates a bucket to its container representation.
-   *
-   * @param containerName container name
-   * @param containerKey container key
-   * @param bucket the bucket to translate
-   * @return KeyData representation of bucket
-   */
-  public static KeyData fromBucketToContainerKeyData(
-      String containerName, String containerKey, BucketInfo bucket) {
-    KeyData.Builder containerKeyData = KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(containerKey)
-        .addMetadata(newKeyValue(TYPE, BUCKET))
-        .addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName()))
-        .addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName()));
-
-    if (bucket.getAcls() != null) {
-      containerKeyData.addMetadata(newKeyValue(ACLS,
-          StringUtils.join(',', bucket.getAcls())));
-    }
-
-    if (bucket.getVersioning() != null &&
-        bucket.getVersioning() != Versioning.NOT_DEFINED) {
-      containerKeyData.addMetadata(newKeyValue(VERSIONING,
-          bucket.getVersioning().name()));
-    }
-
-    if (bucket.getStorageType() != StorageType.RAM_DISK) {
-      containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE,
-          bucket.getStorageType().name()));
-    }
-
-    return containerKeyData.build();
-  }
-
-  /**
-   * Translates a bucket from its container representation.
-   *
-   * @param metadata container metadata representing the bucket
-   * @return bucket translated from container representation
-   */
-  public static BucketInfo fromContainerKeyValueListToBucket(
-      List<KeyValue> metadata) {
-    BucketInfo bucket = new BucketInfo();
-    for (KeyValue keyValue : metadata) {
-      switch (keyValue.getKey()) {
-      case VOLUME_NAME:
-        bucket.setVolumeName(keyValue.getValue());
-        break;
-      case BUCKET_NAME:
-        bucket.setBucketName(keyValue.getValue());
-        break;
-      case VERSIONING:
-        bucket.setVersioning(
-            Enum.valueOf(Versioning.class, keyValue.getValue()));
-        break;
-      case STORAGE_TYPE:
-        bucket.setStorageType(
-            Enum.valueOf(StorageType.class, keyValue.getValue()));
-        break;
-      default:
-        break;
-      }
-    }
-    return bucket;
-  }
-
-  /**
-   * Translates a volume from its container representation.
-   *
-   * @param metadata container metadata representing the volume
-   * @return volume translated from container representation
-   */
-  public static VolumeInfo fromContainerKeyValueListToVolume(
-      List<KeyValue> metadata) {
-    VolumeInfo volume = new VolumeInfo();
-    for (KeyValue keyValue : metadata) {
-      switch (keyValue.getKey()) {
-      case VOLUME_NAME:
-        volume.setVolumeName(keyValue.getValue());
-        break;
-      case CREATED_BY:
-        volume.setCreatedBy(keyValue.getValue());
-        break;
-      case CREATED_ON:
-        volume.setCreatedOn(keyValue.getValue());
-        break;
-      case OWNER:
-        volume.setOwner(new VolumeOwner(keyValue.getValue()));
-        break;
-      case QUOTA:
-        volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue()));
-        break;
-      default:
-        break;
-      }
-    }
-    return volume;
-  }
-
-  /**
-   * Translates a key to its container representation.
-   *
-   * @param containerName container name
-   * @param containerKey container key
-   * @param keyInfo key information received from call
-   * @return KeyData intended for reading the container key
-   */
-  public static KeyData fromKeyToContainerKeyData(String containerName,
-      String containerKey, KeyInfo key) {
-    return KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(containerKey)
-        .addMetadata(newKeyValue(TYPE, KEY))
-        .build();
-  }
-
-  /**
-   * Translates a key to its container representation.  The return value is a
-   * builder that can be manipulated further before building the result.
-   *
-   * @param containerName container name
-   * @param containerKey container key
-   * @param keyInfo key information received from call
-   * @return KeyData builder
-   */
-  public static KeyData.Builder fromKeyToContainerKeyDataBuilder(
-      String containerName, String containerKey, KeyInfo key) {
-    return KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(containerKey)
-        .addMetadata(newKeyValue(TYPE, KEY));
-  }
-
-  /**
-   * Translates a volume to its container representation.
-   *
-   * @param containerName container name
-   * @param containerKey container key
-   * @param volume the volume to translate
-   * @return KeyData representation of volume
-   */
-  public static KeyData fromVolumeToContainerKeyData(
-      String containerName, String containerKey, VolumeInfo volume) {
-    KeyData.Builder containerKeyData = KeyData
-        .newBuilder()
-        .setContainerName(containerName)
-        .setName(containerKey)
-        .addMetadata(newKeyValue(TYPE, VOLUME))
-        .addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName()))
-        .addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn()));
-
-    if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) {
-      containerKeyData.addMetadata(newKeyValue(QUOTA,
-          OzoneQuota.formatQuota(volume.getQuota())));
-    }
-
-    if (volume.getOwner() != null && volume.getOwner().getName() != null &&
-        !volume.getOwner().getName().isEmpty()) {
-      containerKeyData.addMetadata(newKeyValue(OWNER,
-          volume.getOwner().getName()));
-    }
-
-    if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) {
-      containerKeyData.addMetadata(
-          newKeyValue(CREATED_BY, volume.getCreatedBy()));
-    }
-
-    return containerKeyData.build();
-  }
-
-  /**
-   * Translates a key-value pair to its container representation.
-   *
-   * @param key the key
-   * @param value the value
-   * @return container representation of key-value pair
-   */
-  private static KeyValue newKeyValue(String key, Object value) {
-    return KeyValue.newBuilder().setKey(key).setValue(value.toString()).build();
-  }
-
-  /**
-   * There is no need to instantiate this class.
-   */
-  private OzoneContainerTranslation() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/userauth/Simple.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/userauth/Simple.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/userauth/Simple.java
index 609a47b..6cce47e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/userauth/Simple.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/userauth/Simple.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.headers.Header;
+import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.ozone.web.interfaces.UserAuth;
 import org.apache.hadoop.ozone.OzoneConsts;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
index c417601..2fe64d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
@@ -23,11 +23,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.headers.Header;
+import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.util.Time;
 
 import javax.ws.rs.core.HttpHeaders;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
index 581ccc6..4d69ba9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/Corona.java
@@ -26,7 +26,9 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.ozone.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -132,7 +134,7 @@ public final class Corona extends Configured implements Tool {
     numberOfBucketsCreated = new AtomicInteger();
     numberOfKeysAdded = new AtomicLong();
     OzoneClientFactory.setConfiguration(conf);
-    ozoneClient = OzoneClientFactory.getRpcClient();
+    ozoneClient = OzoneClientFactory.getClient();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientImpl.java
deleted file mode 100644
index b861f7d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientImpl.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.ozone.io.OzoneInputStream;
-import org.apache.hadoop.ozone.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * This class is to test all the public facing APIs of Ozone Client.
- */
-public class TestOzoneClientImpl {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  private static MiniOzoneCluster cluster = null;
-  private static OzoneClientImpl ozClient = null;
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "distributed"
-   *
-   * @throws IOException
-   */
-  @BeforeClass
-  public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = new MiniOzoneCluster.Builder(conf)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-    ozClient = new OzoneClientImpl(conf);
-  }
-
-  @Test
-  public void testCreateVolume()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertEquals(volumeName, volume.getVolumeName());
-  }
-
-  @Test
-  public void testCreateVolumeWithOwner()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName, "test");
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertEquals(volumeName, volume.getVolumeName());
-    Assert.assertEquals("test", volume.getOwnerName());
-  }
-
-  @Test
-  public void testCreateVolumeWithQuota()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName, "test",
-        10000000000L);
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertEquals(volumeName, volume.getVolumeName());
-    Assert.assertEquals("test", volume.getOwnerName());
-    Assert.assertEquals(10000000000L, volume.getQuota());
-  }
-
-  @Test
-  public void testVolumeAlreadyExist()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    try {
-      ozClient.createVolume(volumeName);
-    } catch (IOException ex) {
-      Assert.assertEquals(
-          "Volume creation failed, error:VOLUME_ALREADY_EXISTS",
-          ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testSetVolumeOwner()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.setVolumeOwner(volumeName, "test");
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertEquals("test", volume.getOwnerName());
-  }
-
-  @Test
-  public void testSetVolumeQuota()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.setVolumeQuota(volumeName, 10000000000L);
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertEquals(10000000000L, volume.getQuota());
-  }
-
-  @Test
-  public void testDeleteVolume()
-      throws IOException, OzoneException {
-    thrown.expectMessage("Info Volume failed, error");
-    String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    OzoneVolume volume = ozClient.getVolumeDetails(volumeName);
-    Assert.assertNotNull(volume);
-    ozClient.deleteVolume(volumeName);
-    ozClient.getVolumeDetails(volumeName);
-  }
-
-  @Test
-  public void testCreateBucket()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-  }
-
-  @Test
-  public void testCreateBucketWithVersioning()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName,
-        OzoneConsts.Versioning.ENABLED);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertEquals(OzoneConsts.Versioning.ENABLED,
-        bucket.getVersioning());
-  }
-
-  @Test
-  public void testCreateBucketWithStorageType()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName, StorageType.SSD);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertEquals(StorageType.SSD, bucket.getStorageType());
-  }
-
-  @Test
-  public void testCreateBucketWithAcls()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, "test",
-        OzoneAcl.OzoneACLRights.READ_WRITE);
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName, userAcl);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertTrue(bucket.getAcls().contains(userAcl));
-  }
-
-  @Test
-  public void testCreateBucketWithAllArgument()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, "test",
-        OzoneAcl.OzoneACLRights.READ_WRITE);
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName,
-        OzoneConsts.Versioning.ENABLED,
-        StorageType.SSD, userAcl);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertEquals(OzoneConsts.Versioning.ENABLED,
-        bucket.getVersioning());
-    Assert.assertEquals(StorageType.SSD, bucket.getStorageType());
-    Assert.assertTrue(bucket.getAcls().contains(userAcl));
-  }
-
-  @Test
-  public void testCreateBucketInInvalidVolume()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    try {
-      ozClient.createBucket(volumeName, bucketName);
-    } catch (IOException ex) {
-      Assert.assertEquals(
-          "Bucket creation failed, error: VOLUME_NOT_FOUND",
-          ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testAddBucketAcl()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    List<OzoneAcl> acls = new ArrayList<>();
-    acls.add(new OzoneAcl(
-        OzoneAcl.OzoneACLType.USER, "test",
-        OzoneAcl.OzoneACLRights.READ_WRITE));
-    ozClient.addBucketAcls(volumeName, bucketName, acls);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertTrue(bucket.getAcls().contains(acls.get(0)));
-  }
-
-  @Test
-  public void testRemoveBucketAcl()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, "test",
-        OzoneAcl.OzoneACLRights.READ_WRITE);
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName, userAcl);
-    List<OzoneAcl> acls = new ArrayList<>();
-    acls.add(userAcl);
-    ozClient.removeBucketAcls(volumeName, bucketName, acls);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertTrue(!bucket.getAcls().contains(acls.get(0)));
-  }
-
-  @Test
-  public void testSetBucketVersioning()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    ozClient.setBucketVersioning(volumeName, bucketName,
-        OzoneConsts.Versioning.ENABLED);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertEquals(OzoneConsts.Versioning.ENABLED,
-        bucket.getVersioning());
-  }
-
-  @Test
-  public void testSetBucketStorageType()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    ozClient.setBucketStorageType(volumeName, bucketName,
-        StorageType.SSD);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertEquals(bucketName, bucket.getBucketName());
-    Assert.assertEquals(StorageType.SSD, bucket.getStorageType());
-  }
-
-
-  @Test
-  public void testDeleteBucket()
-      throws IOException, OzoneException {
-    thrown.expectMessage("Info Bucket failed, error");
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    OzoneBucket bucket = ozClient.getBucketDetails(volumeName, bucketName);
-    Assert.assertNotNull(bucket);
-    ozClient.deleteBucket(volumeName, bucketName);
-    ozClient.getBucketDetails(volumeName, bucketName);
-  }
-
-
-  @Test
-  public void testPutKey()
-      throws IOException, OzoneException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    String keyName = UUID.randomUUID().toString();
-    String value = "sample value";
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
-        keyName, value.getBytes().length);
-    out.write(value.getBytes());
-    out.close();
-    OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
-    Assert.assertEquals(keyName, key.getKeyName());
-    OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
-    byte[] fileContent = new byte[value.getBytes().length];
-    is.read(fileContent);
-    Assert.assertEquals(value, new String(fileContent));
-  }
-
-  @Test
-  public void testDeleteKey()
-      throws IOException, OzoneException {
-    thrown.expectMessage("Lookup key failed, error");
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    String keyName = UUID.randomUUID().toString();
-    String value = "sample value";
-    ozClient.createVolume(volumeName);
-    ozClient.createBucket(volumeName, bucketName);
-    OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
-        keyName, value.getBytes().length);
-    out.write(value.getBytes());
-    out.close();
-    OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
-    Assert.assertEquals(keyName, key.getKeyName());
-    ozClient.deleteKey(volumeName, bucketName, keyName);
-    ozClient.getKeyDetails(volumeName, bucketName, keyName);
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterClass
-  public static void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientUtils.java
deleted file mode 100644
index a5dbea7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientUtils.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import org.junit.Rule;
-import org.junit.rules.Timeout;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This test class verifies the parsing of SCM endpoint config settings.
- * The parsing logic is in {@link OzoneClientUtils}.
- */
-public class TestOzoneClientUtils {
-  @Rule
-  public Timeout timeout = new Timeout(300000);
-
-  @Rule
-  public ExpectedException thrown= ExpectedException.none();
-
-  /**
-   * Verify client endpoint lookup failure if it is not configured.
-   */
-  @Test
-  public void testMissingScmClientAddress() {
-    final Configuration conf = new OzoneConfiguration();
-    thrown.expect(IllegalArgumentException.class);
-    OzoneClientUtils.getScmAddressForClients(conf);
-  }
-
-  /**
-   * Verify that the client endpoint can be correctly parsed from
-   * configuration.
-   */
-  @Test
-  public void testGetScmClientAddress() {
-    final Configuration conf = new OzoneConfiguration();
-
-    // First try a client address with just a host name. Verify it falls
-    // back to the default port.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
-    InetSocketAddress addr = OzoneClientUtils.getScmAddressForClients(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
-
-    // Next try a client address with a host name and port. Verify both
-    // are used correctly.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    addr = OzoneClientUtils.getScmAddressForClients(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(100));
-  }
-
-  /**
-   * Verify DataNode endpoint lookup failure if neither the client nor
-   * datanode endpoint are configured.
-   */
-  @Test
-  public void testMissingScmDataNodeAddress() {
-    final Configuration conf = new OzoneConfiguration();
-    thrown.expect(IllegalArgumentException.class);
-    OzoneClientUtils.getScmAddressForDataNodes(conf);
-  }
-
-  /**
-   * Verify that the datanode endpoint is parsed correctly.
-   * This tests the logic used by the DataNodes to determine which address
-   * to connect to.
-   */
-  @Test
-  public void testGetScmDataNodeAddress() {
-    final Configuration conf = new OzoneConfiguration();
-
-    // First try a client address with just a host name. Verify it falls
-    // back to the default port.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
-    InetSocketAddress addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    // Next try a client address with just a host name and port. Verify the port
-    // is ignored and the default DataNode port is used.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and OZONE_SCM_DATANODE_ADDRESS_KEY.
-    // Verify that the latter overrides and the port number is still the default.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8");
-    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and OZONE_SCM_DATANODE_ADDRESS_KEY.
-    // Verify that the latter overrides and the port number from the latter is
-    // used.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8:200");
-    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(200));
-  }
-
-  /**
-   * Verify that the client endpoint bind address is computed correctly.
-   * This tests the logic used by the SCM to determine its own bind address.
-   */
-  @Test
-  public void testScmClientBindHostDefault() {
-    final Configuration conf = new OzoneConfiguration();
-
-    // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
-    // is set differently.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
-    InetSocketAddress addr = OzoneClientUtils.getScmClientBindAddress(conf);
-    assertThat(addr.getHostString(), is("0.0.0.0"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
-
-    // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
-    // is set differently. The port number from OZONE_SCM_CLIENT_ADDRESS_KEY
-    // should be respected.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
-    addr = OzoneClientUtils.getScmClientBindAddress(conf);
-    assertThat(addr.getHostString(), is("0.0.0.0"));
-    assertThat(addr.getPort(), is(100));
-
-    // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
-    // Port number should be default if none is specified via
-    // OZONE_SCM_DATANODE_ADDRESS_KEY.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
-    addr = OzoneClientUtils.getScmClientBindAddress(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
-
-    // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
-    // Port number from OZONE_SCM_CLIENT_ADDRESS_KEY should be
-    // respected.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
-    addr = OzoneClientUtils.getScmClientBindAddress(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(100));
-  }
-
-  /**
-   * Verify that the DataNode endpoint bind address is computed correctly.
-   * This tests the logic used by the SCM to determine its own bind address.
-   */
-  @Test
-  public void testScmDataNodeBindHostDefault() {
-    final Configuration conf = new OzoneConfiguration();
-
-    // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
-    // is set differently.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
-    InetSocketAddress addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
-    assertThat(addr.getHostString(), is("0.0.0.0"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
-    // is set differently. The port number from OZONE_SCM_DATANODE_ADDRESS_KEY
-    // should be respected.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
-    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
-    assertThat(addr.getHostString(), is("0.0.0.0"));
-    assertThat(addr.getPort(), is(200));
-
-    // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
-    // Port number should be default if none is specified via
-    // OZONE_SCM_DATANODE_ADDRESS_KEY.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
-    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
-
-    // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
-    // Port number from OZONE_SCM_DATANODE_ADDRESS_KEY should be
-    // respected.
-    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
-    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
-    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
-    assertThat(addr.getHostString(), is("5.6.7.8"));
-    assertThat(addr.getPort(), is(200));
-  }
-
-  @Test
-  public void testGetSCMAddresses() {
-    final Configuration conf = new OzoneConfiguration();
-    Collection<InetSocketAddress> addresses = null;
-    InetSocketAddress addr = null;
-    Iterator<InetSocketAddress> it = null;
-
-    // Verify valid IP address setup
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
-    addresses = OzoneClientUtils.getSCMAddresses(conf);
-    assertThat(addresses.size(), is(1));
-    addr = addresses.iterator().next();
-    assertThat(addr.getHostName(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT));
-
-    // Verify valid hostname setup
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
-    addresses = OzoneClientUtils.getSCMAddresses(conf);
-    assertThat(addresses.size(), is(1));
-    addr = addresses.iterator().next();
-    assertThat(addr.getHostName(), is("scm1"));
-    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT));
-
-    // Verify valid hostname and port
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
-    addresses = OzoneClientUtils.getSCMAddresses(conf);
-    assertThat(addresses.size(), is(1));
-    addr = addresses.iterator().next();
-    assertThat(addr.getHostName(), is("scm1"));
-    assertThat(addr.getPort(), is(1234));
-
-    final HashMap<String, Integer> hostsAndPorts =
-        new HashMap<String, Integer>();
-    hostsAndPorts.put("scm1", 1234);
-    hostsAndPorts.put("scm2", 2345);
-    hostsAndPorts.put("scm3", 3456);
-
-    // Verify multiple hosts and port
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
-    addresses = OzoneClientUtils.getSCMAddresses(conf);
-    assertThat(addresses.size(), is(3));
-    it = addresses.iterator();
-    HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
-    while(it.hasNext()) {
-      InetSocketAddress current = it.next();
-      assertTrue(expected1.remove(current.getHostName(),
-          current.getPort()));
-    }
-    assertTrue(expected1.isEmpty());
-
-    // Verify names with spaces
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
-    addresses = OzoneClientUtils.getSCMAddresses(conf);
-    assertThat(addresses.size(), is(3));
-    it = addresses.iterator();
-    HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
-    while(it.hasNext()) {
-      InetSocketAddress current = it.next();
-      assertTrue(expected2.remove(current.getHostName(),
-          current.getPort()));
-    }
-    assertTrue(expected2.isEmpty());
-
-    // Verify empty value
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
-    try {
-      addresses = OzoneClientUtils.getSCMAddresses(conf);
-      fail("Empty value should cause an IllegalArgumentException");
-    } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException);
-    }
-
-    // Verify invalid hostname
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
-    try {
-      addresses = OzoneClientUtils.getSCMAddresses(conf);
-      fail("An invalid hostname should cause an IllegalArgumentException");
-    } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException);
-    }
-
-    // Verify invalid port
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
-    try {
-      addresses = OzoneClientUtils.getSCMAddresses(conf);
-      fail("An invalid port should cause an IllegalArgumentException");
-    } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException);
-    }
-
-    // Verify a mixed case (valid and invalid value both appears)
-    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
-    try {
-      addresses = OzoneClientUtils.getSCMAddresses(conf);
-      fail("An invalid value should cause an IllegalArgumentException");
-    } catch (Exception e) {
-      assertTrue(e instanceof IllegalArgumentException);
-    }
-  }
-
-  @Test
-  public void testGetKSMAddress() {
-    final Configuration conf = new OzoneConfiguration();
-
-    // First try a client address with just a host name. Verify it falls
-    // back to the default port.
-    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "1.2.3.4");
-    InetSocketAddress addr = OzoneClientUtils.getKsmAddress(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(KSMConfigKeys.OZONE_KSM_PORT_DEFAULT));
-
-    // Next try a client address with just a host name and port. Verify the port
-    // is ignored and the default KSM port is used.
-    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "1.2.3.4:100");
-    addr = OzoneClientUtils.getKsmAddress(conf);
-    assertThat(addr.getHostString(), is("1.2.3.4"));
-    assertThat(addr.getPort(), is(100));
-
-    // Assert the we are able to use default configs if no value is specified.
-    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "");
-    addr = OzoneClientUtils.getKsmAddress(conf);
-    assertThat(addr.getHostString(), is("0.0.0.0"));
-    assertThat(addr.getPort(), is(KSMConfigKeys.OZONE_KSM_PORT_DEFAULT));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClientUtils.java
new file mode 100644
index 0000000..4ca5d83
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClientUtils.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test class verifies the parsing of SCM endpoint config settings.
+ * The parsing logic is in {@link OzoneClientUtils}.
+ */
+public class TestOzoneClientUtils {
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  @Rule
+  public ExpectedException thrown= ExpectedException.none();
+
+  /**
+   * Verify client endpoint lookup failure if it is not configured.
+   */
+  @Test
+  public void testMissingScmClientAddress() {
+    final Configuration conf = new OzoneConfiguration();
+    thrown.expect(IllegalArgumentException.class);
+    OzoneClientUtils.getScmAddressForClients(conf);
+  }
+
+  /**
+   * Verify that the client endpoint can be correctly parsed from
+   * configuration.
+   */
+  @Test
+  public void testGetScmClientAddress() {
+    final Configuration conf = new OzoneConfiguration();
+
+    // First try a client address with just a host name. Verify it falls
+    // back to the default port.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
+    InetSocketAddress addr = OzoneClientUtils.getScmAddressForClients(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+
+    // Next try a client address with a host name and port. Verify both
+    // are used correctly.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    addr = OzoneClientUtils.getScmAddressForClients(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(100));
+  }
+
+  /**
+   * Verify DataNode endpoint lookup failure if neither the client nor
+   * datanode endpoint are configured.
+   */
+  @Test
+  public void testMissingScmDataNodeAddress() {
+    final Configuration conf = new OzoneConfiguration();
+    thrown.expect(IllegalArgumentException.class);
+    OzoneClientUtils.getScmAddressForDataNodes(conf);
+  }
+
+  /**
+   * Verify that the datanode endpoint is parsed correctly.
+   * This tests the logic used by the DataNodes to determine which address
+   * to connect to.
+   */
+  @Test
+  public void testGetScmDataNodeAddress() {
+    final Configuration conf = new OzoneConfiguration();
+
+    // First try a client address with just a host name. Verify it falls
+    // back to the default port.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
+    InetSocketAddress addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    // Next try a client address with just a host name and port.
+    // Verify the port is ignored and the default DataNode port is used.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and
+    // OZONE_SCM_DATANODE_ADDRESS_KEY.
+    // Verify that the latter overrides and the port number is still the
+    // default.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8");
+    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    // Set both OZONE_SCM_CLIENT_ADDRESS_KEY and
+    // OZONE_SCM_DATANODE_ADDRESS_KEY.
+    // Verify that the latter overrides and the port number from the latter is
+    // used.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "5.6.7.8:200");
+    addr = OzoneClientUtils.getScmAddressForDataNodes(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(200));
+  }
+
+  /**
+   * Verify that the client endpoint bind address is computed correctly.
+   * This tests the logic used by the SCM to determine its own bind address.
+   */
+  @Test
+  public void testScmClientBindHostDefault() {
+    final Configuration conf = new OzoneConfiguration();
+
+    // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
+    // is set differently.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
+    InetSocketAddress addr = OzoneClientUtils.getScmClientBindAddress(conf);
+    assertThat(addr.getHostString(), is("0.0.0.0"));
+    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+
+    // The bind host should be 0.0.0.0 unless OZONE_SCM_CLIENT_BIND_HOST_KEY
+    // is set differently. The port number from OZONE_SCM_CLIENT_ADDRESS_KEY
+    // should be respected.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
+    addr = OzoneClientUtils.getScmClientBindAddress(conf);
+    assertThat(addr.getHostString(), is("0.0.0.0"));
+    assertThat(addr.getPort(), is(100));
+
+    // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
+    // Port number should be default if none is specified via
+    // OZONE_SCM_DATANODE_ADDRESS_KEY.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
+    addr = OzoneClientUtils.getScmClientBindAddress(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+
+    // OZONE_SCM_CLIENT_BIND_HOST_KEY should be respected.
+    // Port number from OZONE_SCM_CLIENT_ADDRESS_KEY should be
+    // respected.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY, "5.6.7.8");
+    addr = OzoneClientUtils.getScmClientBindAddress(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(100));
+  }
+
+  /**
+   * Verify that the DataNode endpoint bind address is computed correctly.
+   * This tests the logic used by the SCM to determine its own bind address.
+   */
+  @Test
+  public void testScmDataNodeBindHostDefault() {
+    final Configuration conf = new OzoneConfiguration();
+
+    // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
+    // is set differently.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4");
+    InetSocketAddress addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
+    assertThat(addr.getHostString(), is("0.0.0.0"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    // The bind host should be 0.0.0.0 unless OZONE_SCM_DATANODE_BIND_HOST_KEY
+    // is set differently. The port number from OZONE_SCM_DATANODE_ADDRESS_KEY
+    // should be respected.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
+    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
+    assertThat(addr.getHostString(), is("0.0.0.0"));
+    assertThat(addr.getPort(), is(200));
+
+    // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
+    // Port number should be default if none is specified via
+    // OZONE_SCM_DATANODE_ADDRESS_KEY.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
+    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(
+        ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    // OZONE_SCM_DATANODE_BIND_HOST_KEY should be respected.
+    // Port number from OZONE_SCM_DATANODE_ADDRESS_KEY should be
+    // respected.
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "1.2.3.4:100");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "1.2.3.4:200");
+    conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY, "5.6.7.8");
+    addr = OzoneClientUtils.getScmDataNodeBindAddress(conf);
+    assertThat(addr.getHostString(), is("5.6.7.8"));
+    assertThat(addr.getPort(), is(200));
+  }
+
+  @Test
+  public void testGetSCMAddresses() {
+    final Configuration conf = new OzoneConfiguration();
+    Collection<InetSocketAddress> addresses = null;
+    InetSocketAddress addr = null;
+    Iterator<InetSocketAddress> it = null;
+
+    // Verify valid IP address setup
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
+    addresses = OzoneClientUtils.getSCMAddresses(conf);
+    assertThat(addresses.size(), is(1));
+    addr = addresses.iterator().next();
+    assertThat(addr.getHostName(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT));
+
+    // Verify valid hostname setup
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
+    addresses = OzoneClientUtils.getSCMAddresses(conf);
+    assertThat(addresses.size(), is(1));
+    addr = addresses.iterator().next();
+    assertThat(addr.getHostName(), is("scm1"));
+    assertThat(addr.getPort(), is(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT));
+
+    // Verify valid hostname and port
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
+    addresses = OzoneClientUtils.getSCMAddresses(conf);
+    assertThat(addresses.size(), is(1));
+    addr = addresses.iterator().next();
+    assertThat(addr.getHostName(), is("scm1"));
+    assertThat(addr.getPort(), is(1234));
+
+    final HashMap<String, Integer> hostsAndPorts =
+        new HashMap<String, Integer>();
+    hostsAndPorts.put("scm1", 1234);
+    hostsAndPorts.put("scm2", 2345);
+    hostsAndPorts.put("scm3", 3456);
+
+    // Verify multiple hosts and port
+    conf.setStrings(
+        ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
+    addresses = OzoneClientUtils.getSCMAddresses(conf);
+    assertThat(addresses.size(), is(3));
+    it = addresses.iterator();
+    HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
+    while(it.hasNext()) {
+      InetSocketAddress current = it.next();
+      assertTrue(expected1.remove(current.getHostName(),
+          current.getPort()));
+    }
+    assertTrue(expected1.isEmpty());
+
+    // Verify names with spaces
+    conf.setStrings(
+        ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
+    addresses = OzoneClientUtils.getSCMAddresses(conf);
+    assertThat(addresses.size(), is(3));
+    it = addresses.iterator();
+    HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
+    while(it.hasNext()) {
+      InetSocketAddress current = it.next();
+      assertTrue(expected2.remove(current.getHostName(),
+          current.getPort()));
+    }
+    assertTrue(expected2.isEmpty());
+
+    // Verify empty value
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
+    try {
+      addresses = OzoneClientUtils.getSCMAddresses(conf);
+      fail("Empty value should cause an IllegalArgumentException");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException);
+    }
+
+    // Verify invalid hostname
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
+    try {
+      addresses = OzoneClientUtils.getSCMAddresses(conf);
+      fail("An invalid hostname should cause an IllegalArgumentException");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException);
+    }
+
+    // Verify invalid port
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
+    try {
+      addresses = OzoneClientUtils.getSCMAddresses(conf);
+      fail("An invalid port should cause an IllegalArgumentException");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException);
+    }
+
+    // Verify a mixed case (valid and invalid value both appears)
+    conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
+    try {
+      addresses = OzoneClientUtils.getSCMAddresses(conf);
+      fail("An invalid value should cause an IllegalArgumentException");
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException);
+    }
+  }
+
+  @Test
+  public void testGetKSMAddress() {
+    final Configuration conf = new OzoneConfiguration();
+
+    // First try a client address with just a host name. Verify it falls
+    // back to the default port.
+    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "1.2.3.4");
+    InetSocketAddress addr = OzoneClientUtils.getKsmAddress(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(KSMConfigKeys.OZONE_KSM_PORT_DEFAULT));
+
+    // Next try a client address with just a host name and port. Verify the port
+    // is ignored and the default KSM port is used.
+    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "1.2.3.4:100");
+    addr = OzoneClientUtils.getKsmAddress(conf);
+    assertThat(addr.getHostString(), is("1.2.3.4"));
+    assertThat(addr.getPort(), is(100));
+
+    // Assert the we are able to use default configs if no value is specified.
+    conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "");
+    addr = OzoneClientUtils.getKsmAddress(conf);
+    assertThat(addr.getHostString(), is("0.0.0.0"));
+    assertThat(addr.getPort(), is(KSMConfigKeys.OZONE_KSM_PORT_DEFAULT));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/package-info.java
new file mode 100644
index 0000000..be63eab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+/**
+ * This package contains test classes for Ozone Client.
+ */
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org