You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/27 02:27:55 UTC

hadoop git commit: HDFS-12425. Ozone: OzoneFileSystem: read/write/create/open/getFileInfo APIs. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 9fe5a931b -> a7df79ca5


HDFS-12425. Ozone: OzoneFileSystem: read/write/create/open/getFileInfo APIs. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7df79ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7df79ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7df79ca

Branch: refs/heads/HDFS-7240
Commit: a7df79ca56a84323b50f7248b010ff1220ce3231
Parents: 9fe5a93
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Sep 26 19:26:40 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Sep 26 19:26:40 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java |   3 +-
 .../org/apache/hadoop/fs/ozone/Constants.java   |   8 +
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java | 136 ++++++++++++-
 .../hadoop/fs/ozone/OzoneInputStream.java       | 191 +++++++++++++++++++
 .../hadoop/fs/ozone/OzoneOutputStream.java      | 113 +++++++++++
 .../fs/ozone/TestOzoneFileInterfaces.java       |  27 +++
 6 files changed, 470 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index e1ef754..301d5e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -192,7 +192,8 @@ public class KeyManagerImpl implements KeyManager {
           volumeName, bucketName, keyName);
       byte[] value = metadataManager.get(keyKey);
       if (value == null) {
-        LOG.debug("Key: {} not found", keyKey);
+        LOG.debug("volume:{} bucket:{} Key:{} not found",
+            volumeName, bucketName, keyName);
         throw new KSMException("Key not found",
             KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
index 45c4172..0271d6c 100644
--- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
@@ -31,6 +31,14 @@ public class Constants {
 
   public static final String OZONE_USER_DIR = "/user";
 
+  /** Local buffer directory. */
+  public static final String BUFFER_DIR_KEY = "fs.ozone.buffer.dir";
+
+  /** Temporary directory. */
+  public static final String BUFFER_TMP_KEY = "hadoop.tmp.dir";
+
+  public static final String OZONE_URI_DELIMITER = "/";
+
   private Constants() {
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index ff3340e..f6260ff 100644
--- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -18,13 +18,18 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.ParseException;
 import java.util.EnumSet;
 import java.util.Objects;
 
+import org.apache.hadoop.ozone.web.client.OzoneKey;
 import org.apache.hadoop.ozone.web.client.OzoneRestClient;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ozone.web.client.OzoneBucket;
 import org.apache.hadoop.ozone.web.client.OzoneVolume;
@@ -48,6 +54,7 @@ import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
 
 /**
  * The Ozone Filesystem implementation.
@@ -76,13 +83,11 @@ public class OzoneFileSystem extends FileSystem {
     Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
     assert getScheme().equals(name.getScheme());
 
-    uri = name;
     Path path = new Path(name.getPath());
     String hostStr = name.getAuthority();
     String volumeStr = null;
     String bucketStr = null;
 
-    LOG.info("Ozone URI for ozfs initialization is " + uri);
     while (path != null && !path.isRoot()) {
       bucketStr = volumeStr;
       volumeStr = path.getName();
@@ -98,6 +103,10 @@ public class OzoneFileSystem extends FileSystem {
     }
 
     try {
+      uri = new URIBuilder().setScheme(OZONE_URI_SCHEME).setHost(hostStr)
+          .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
+              + bucketStr + OZONE_URI_DELIMITER).build();
+      LOG.info("Ozone URI for ozfs initialization is " + uri);
       this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr);
       try {
         this.userName =
@@ -143,7 +152,16 @@ public class OzoneFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return null;
+    LOG.trace("open() path:{}", f);
+    final FileStatus fileStatus = getFileStatus(f);
+
+    if (fileStatus.isDirectory()) {
+      throw new FileNotFoundException("Can't open directory " + f + " to read");
+    }
+
+    return new FSDataInputStream(
+        new OzoneInputStream(getConf(), uri, bucket, pathToKey(f),
+            fileStatus.getLen(), bufferSize, statistics));
   }
 
   @Override
@@ -151,7 +169,31 @@ public class OzoneFileSystem extends FileSystem {
                                    boolean overwrite, int bufferSize,
                                    short replication, long blockSize,
                                    Progressable progress) throws IOException {
-    return null;
+    LOG.trace("create() path:{}", f);
+    final String key = pathToKey(f);
+    final FileStatus status;
+    try {
+      status = getFileStatus(f);
+      if (status.isDirectory()) {
+        throw new FileAlreadyExistsException(f + " is a directory");
+      } else {
+        if (!overwrite) {
+          // path references a file and overwrite is disabled
+          throw new FileAlreadyExistsException(f + " already exists");
+        }
+        LOG.debug("Overwriting file {}", f);
+        //TODO: Delete the existing file here
+      }
+    } catch (FileNotFoundException ignored) {
+      // This exception needs to ignored as this means that the file currently
+      // does not exists and a new file can thus be created.
+    }
+
+    final OzoneOutputStream stream =
+        new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics);
+    // We pass null to FSDataOutputStream so it won't count writes that
+    // are being buffered to a file
+    return new FSDataOutputStream(stream, null);
   }
 
   @Override
@@ -162,13 +204,22 @@ public class OzoneFileSystem extends FileSystem {
       short replication,
       long blockSize,
       Progressable progress) throws IOException {
-    return null;
+    final Path parent = path.getParent();
+    if (parent != null) {
+      // expect this to raise an exception if there is no parent
+      if (!getFileStatus(parent).isDirectory()) {
+        throw new FileAlreadyExistsException("Not a directory: " + parent);
+      }
+    }
+    return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
+        bufferSize, replication, blockSize, progress);
   }
 
   @Override
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
-    return null;
+    throw new UnsupportedOperationException("append() Not implemented by the "
+        + getClass().getSimpleName() + " FileSystem implementation");
   }
 
   @Override
@@ -201,9 +252,80 @@ public class OzoneFileSystem extends FileSystem {
     return false;
   }
 
+  private OzoneKey getKeyStatus(String keyName) {
+    try {
+      return bucket.getKeyInfo(keyName);
+    } catch (OzoneException e) {
+      LOG.trace("Key:{} does not exists", keyName);
+      return null;
+    }
+  }
+
+  private long getModifiedTime(String modifiedTime, String key) {
+    try {
+      return OzoneUtils.formatDate(modifiedTime);
+    } catch (ParseException pe) {
+      LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
+      return 0;
+    }
+  }
+
+  private boolean isDirectory(OzoneKey key) {
+    LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
+        key.getObjectInfo().getSize());
+    return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
+        && (key.getObjectInfo().getSize() == 0);
+  }
+
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    return null;
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    String key = pathToKey(qualifiedPath);
+
+    if (key.length() == 0) {
+      return new FileStatus(0, true, 1, 0,
+          getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER),
+          qualifiedPath);
+    }
+
+    // consider this a file and get key status
+    OzoneKey meta = getKeyStatus(key);
+    if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) {
+      // if that fails consider this a directory
+      key += OZONE_URI_DELIMITER;
+      meta = getKeyStatus(key);
+    }
+
+    if (meta == null) {
+      LOG.trace("File:{} not found", f);
+      throw new FileNotFoundException(f + ": No such file or directory!");
+    } else if (isDirectory(meta)) {
+      return new FileStatus(0, true, 1, 0,
+          getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
+          qualifiedPath);
+    } else {
+      return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
+            getDefaultBlockSize(f),
+          getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
+          qualifiedPath);
+    }
+  }
+
+  /**
+   * Turn a path (relative or otherwise) into an Ozone key.
+   *
+   * @param path the path of the file.
+   * @return the key of the object that represents the file.
+   */
+  private String pathToKey(Path path) {
+    Objects.requireNonNull(path, "Path can not be null!");
+    if (!path.isAbsolute()) {
+      path = new Path(workingDir, path);
+    }
+    // removing leading '/' char
+    String key = path.toUri().getPath().substring(1);
+    LOG.trace("path for key:{} is:{}", key, path);
+    return key;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
new file mode 100644
index 0000000..07733e5
--- /dev/null
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.ozone.web.client.OzoneBucket;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
+
+/**
+ * Wraps OzoneInputStream implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class OzoneInputStream extends FSInputStream {
+  private static final Log LOG = LogFactory.getLog(OzoneInputStream.class);
+
+  private final RandomAccessFile in;
+
+  /** Closed bit. Volatile so reads are non-blocking. */
+  private volatile boolean closed = false;
+
+  /** the ozone bucket client. */
+  private final OzoneBucket bucket;
+
+  /** The object key. */
+  private final String key;
+
+  /** Object content length. */
+  private final long contentLen;
+
+  /** file system stats. */
+  private final Statistics stats;
+
+  private final URI keyUri;
+
+  OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
+      String key, long contentLen, int bufferSize, Statistics statistics)
+      throws IOException {
+    Objects.requireNonNull(bucket, "bucket can not be null!");
+    Objects.requireNonNull(key, "kenName can not be null!");
+    this.bucket = bucket;
+    this.key = key;
+    this.contentLen = contentLen;
+    this.stats = statistics;
+    this.keyUri = fsUri.resolve(key);
+
+    if (conf.get(BUFFER_DIR_KEY) == null) {
+      conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
+    }
+    final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+    final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
+        LocalDirAllocator.SIZE_UNKNOWN, conf);
+    try {
+      LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath());
+      bucket.getKey(this.key, tmpFile.toPath());
+      in = new RandomAccessFile(tmpFile, "r");
+      statistics.incrementReadOps(1);
+    } catch (OzoneException oe) {
+      final String msg = "Error when getBytes for key = " + key;
+      LOG.error(msg, oe);
+      throw new IOException(msg, oe);
+    }
+  }
+
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    checkNotClosed();
+    // Do not allow negative seek
+    if (targetPos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos);
+    }
+
+    if (this.contentLen <= 0) {
+      return;
+    }
+
+    in.seek(targetPos);
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    checkNotClosed();
+    return in.getFilePointer();
+  }
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    int ch = in.read();
+    if (stats != null && ch != -1) {
+      stats.incrementBytesRead(1);
+    }
+    return ch;
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    Preconditions.checkArgument(buffer != null, "buffer can not be null");
+    int numberOfByteRead = super.read(position, buffer, offset, length);
+
+    if (stats != null && numberOfByteRead > 0) {
+      stats.incrementBytesRead(numberOfByteRead);
+    }
+    return numberOfByteRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buffer, int offset, int length)
+      throws IOException {
+    Preconditions.checkArgument(buffer != null, "buffer can not be null");
+    int numberOfByteRead = in.read(buffer, offset, length);
+    if (stats != null && numberOfByteRead > 0) {
+      stats.incrementBytesRead(numberOfByteRead);
+    }
+    return numberOfByteRead;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    checkNotClosed();
+
+    final long remainingInWrapped = contentLen - in.getFilePointer();
+    return (remainingInWrapped < Integer.MAX_VALUE)
+        ? (int)remainingInWrapped
+        : Integer.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  public synchronized long skip(long pos) throws IOException {
+    return in.skipBytes((int) pos);
+  }
+
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(this.keyUri + ": "
+          + FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
new file mode 100644
index 0000000..bf93c9e
--- /dev/null
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.ozone.web.client.OzoneBucket;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
+
+
+/**
+ * The output stream for Ozone file system.
+ *
+ * Data will be buffered on local disk, then uploaded to Ozone in
+ * {@link #close()} method.
+ *
+ * This class is not thread safe.
+ */
+public class OzoneOutputStream extends OutputStream {
+  private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class);
+  private OzoneBucket bucket;
+  private final String key;
+  private final URI keyUri;
+  private Statistics statistics;
+  private LocalDirAllocator dirAlloc;
+  private boolean closed;
+  private File tmpFile;
+  private BufferedOutputStream backupStream;
+
+  OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
+      String key, Statistics statistics) throws IOException {
+    this.bucket = bucket;
+    this.key = key;
+    this.keyUri = fsUri.resolve(key);
+    this.statistics = statistics;
+
+    if (conf.get(BUFFER_DIR_KEY) == null) {
+      conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
+    }
+    dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+    tmpFile = dirAlloc.createTmpFileForWrite("output-",
+        LocalDirAllocator.SIZE_UNKNOWN, conf);
+    backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
+
+    closed = false;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (backupStream != null) {
+      backupStream.close();
+    }
+    try {
+      LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri);
+      bucket.putKey(key, tmpFile);
+      statistics.incrementWriteOps(1);
+    } catch (OzoneException oe) {
+      final String msg = "Uploading error: file=" + tmpFile + ", key=" + key;
+      LOG.error(msg, oe);
+      throw new IOException(msg, oe);
+    } finally {
+      if (!tmpFile.delete()) {
+        LOG.warn("Can not delete tmpFile: " + tmpFile);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    backupStream.flush();
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    backupStream.write(b);
+    statistics.incrementBytesWritten(1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
index 4697537..24b40d8 100644
--- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
+++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.fs.ozone;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -32,6 +36,7 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -96,4 +101,26 @@ public class TestOzoneFileInterfaces {
     Assert.assertTrue(fs instanceof OzoneFileSystem);
     Assert.assertEquals(fs.getUri().getScheme(), Constants.OZONE_URI_SCHEME);
   }
+
+  @Test
+  public void testOzFsReadWrite() throws IOException {
+    long currentTime = Time.now();
+    int stringLen = 20;
+    String data = RandomStringUtils.randomAlphanumeric(stringLen);
+    String filePath = RandomStringUtils.randomAlphanumeric(5);
+    Path path = new Path("/" + filePath);
+    try (FSDataOutputStream stream = fs.create(path)) {
+      stream.writeBytes(data);
+    }
+
+    FileStatus status = fs.getFileStatus(path);
+    Assert.assertTrue(status.getModificationTime() < currentTime);
+
+    try (FSDataInputStream inputStream = fs.open(path)) {
+      byte[] buffer = new byte[stringLen];
+      inputStream.readFully(0, buffer);
+      String out = new String(buffer, 0, buffer.length);
+      Assert.assertEquals(out, data);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org