You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/27 02:27:55 UTC
hadoop git commit: HDFS-12425. Ozone: OzoneFileSystem:
read/write/create/open/getFileInfo APIs. Contributed by Mukul Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 9fe5a931b -> a7df79ca5
HDFS-12425. Ozone: OzoneFileSystem: read/write/create/open/getFileInfo APIs. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7df79ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7df79ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7df79ca
Branch: refs/heads/HDFS-7240
Commit: a7df79ca56a84323b50f7248b010ff1220ce3231
Parents: 9fe5a93
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Sep 26 19:26:40 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Sep 26 19:26:40 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 3 +-
.../org/apache/hadoop/fs/ozone/Constants.java | 8 +
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 136 ++++++++++++-
.../hadoop/fs/ozone/OzoneInputStream.java | 191 +++++++++++++++++++
.../hadoop/fs/ozone/OzoneOutputStream.java | 113 +++++++++++
.../fs/ozone/TestOzoneFileInterfaces.java | 27 +++
6 files changed, 470 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index e1ef754..301d5e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -192,7 +192,8 @@ public class KeyManagerImpl implements KeyManager {
volumeName, bucketName, keyName);
byte[] value = metadataManager.get(keyKey);
if (value == null) {
- LOG.debug("Key: {} not found", keyKey);
+ LOG.debug("volume:{} bucket:{} Key:{} not found",
+ volumeName, bucketName, keyName);
throw new KSMException("Key not found",
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
index 45c4172..0271d6c 100644
--- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
@@ -31,6 +31,14 @@ public class Constants {
public static final String OZONE_USER_DIR = "/user";
+ /** Local buffer directory. */
+ public static final String BUFFER_DIR_KEY = "fs.ozone.buffer.dir";
+
+ /** Temporary directory. */
+ public static final String BUFFER_TMP_KEY = "hadoop.tmp.dir";
+
+ public static final String OZONE_URI_DELIMITER = "/";
+
private Constants() {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index ff3340e..f6260ff 100644
--- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -18,13 +18,18 @@
package org.apache.hadoop.fs.ozone;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.ParseException;
import java.util.EnumSet;
import java.util.Objects;
+import org.apache.hadoop.ozone.web.client.OzoneKey;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.client.OzoneVolume;
@@ -48,6 +54,7 @@ import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
+import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
/**
* The Ozone Filesystem implementation.
@@ -76,13 +83,11 @@ public class OzoneFileSystem extends FileSystem {
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
assert getScheme().equals(name.getScheme());
- uri = name;
Path path = new Path(name.getPath());
String hostStr = name.getAuthority();
String volumeStr = null;
String bucketStr = null;
- LOG.info("Ozone URI for ozfs initialization is " + uri);
while (path != null && !path.isRoot()) {
bucketStr = volumeStr;
volumeStr = path.getName();
@@ -98,6 +103,10 @@ public class OzoneFileSystem extends FileSystem {
}
try {
+ uri = new URIBuilder().setScheme(OZONE_URI_SCHEME).setHost(hostStr)
+ .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
+ + bucketStr + OZONE_URI_DELIMITER).build();
+ LOG.info("Ozone URI for ozfs initialization is " + uri);
this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr);
try {
this.userName =
@@ -143,7 +152,16 @@ public class OzoneFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- return null;
+ LOG.trace("open() path:{}", f);
+ final FileStatus fileStatus = getFileStatus(f);
+
+ if (fileStatus.isDirectory()) {
+ throw new FileNotFoundException("Can't open directory " + f + " to read");
+ }
+
+ return new FSDataInputStream(
+ new OzoneInputStream(getConf(), uri, bucket, pathToKey(f),
+ fileStatus.getLen(), bufferSize, statistics));
}
@Override
@@ -151,7 +169,31 @@ public class OzoneFileSystem extends FileSystem {
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
- return null;
+ LOG.trace("create() path:{}", f);
+ final String key = pathToKey(f);
+ final FileStatus status;
+ try {
+ status = getFileStatus(f);
+ if (status.isDirectory()) {
+ throw new FileAlreadyExistsException(f + " is a directory");
+ } else {
+ if (!overwrite) {
+ // path references a file and overwrite is disabled
+ throw new FileAlreadyExistsException(f + " already exists");
+ }
+ LOG.debug("Overwriting file {}", f);
+ //TODO: Delete the existing file here
+ }
+ } catch (FileNotFoundException ignored) {
+ // This exception needs to ignored as this means that the file currently
+ // does not exists and a new file can thus be created.
+ }
+
+ final OzoneOutputStream stream =
+ new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics);
+ // We pass null to FSDataOutputStream so it won't count writes that
+ // are being buffered to a file
+ return new FSDataOutputStream(stream, null);
}
@Override
@@ -162,13 +204,22 @@ public class OzoneFileSystem extends FileSystem {
short replication,
long blockSize,
Progressable progress) throws IOException {
- return null;
+ final Path parent = path.getParent();
+ if (parent != null) {
+ // expect this to raise an exception if there is no parent
+ if (!getFileStatus(parent).isDirectory()) {
+ throw new FileAlreadyExistsException("Not a directory: " + parent);
+ }
+ }
+ return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
+ bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
- return null;
+ throw new UnsupportedOperationException("append() Not implemented by the "
+ + getClass().getSimpleName() + " FileSystem implementation");
}
@Override
@@ -201,9 +252,80 @@ public class OzoneFileSystem extends FileSystem {
return false;
}
+ private OzoneKey getKeyStatus(String keyName) {
+ try {
+ return bucket.getKeyInfo(keyName);
+ } catch (OzoneException e) {
+ LOG.trace("Key:{} does not exists", keyName);
+ return null;
+ }
+ }
+
+ private long getModifiedTime(String modifiedTime, String key) {
+ try {
+ return OzoneUtils.formatDate(modifiedTime);
+ } catch (ParseException pe) {
+ LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
+ return 0;
+ }
+ }
+
+ private boolean isDirectory(OzoneKey key) {
+ LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
+ key.getObjectInfo().getSize());
+ return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
+ && (key.getObjectInfo().getSize() == 0);
+ }
+
@Override
public FileStatus getFileStatus(Path f) throws IOException {
- return null;
+ Path qualifiedPath = f.makeQualified(uri, workingDir);
+ String key = pathToKey(qualifiedPath);
+
+ if (key.length() == 0) {
+ return new FileStatus(0, true, 1, 0,
+ getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER),
+ qualifiedPath);
+ }
+
+ // consider this a file and get key status
+ OzoneKey meta = getKeyStatus(key);
+ if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) {
+ // if that fails consider this a directory
+ key += OZONE_URI_DELIMITER;
+ meta = getKeyStatus(key);
+ }
+
+ if (meta == null) {
+ LOG.trace("File:{} not found", f);
+ throw new FileNotFoundException(f + ": No such file or directory!");
+ } else if (isDirectory(meta)) {
+ return new FileStatus(0, true, 1, 0,
+ getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
+ qualifiedPath);
+ } else {
+ return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
+ getDefaultBlockSize(f),
+ getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
+ qualifiedPath);
+ }
+ }
+
+ /**
+ * Turn a path (relative or otherwise) into an Ozone key.
+ *
+ * @param path the path of the file.
+ * @return the key of the object that represents the file.
+ */
+ private String pathToKey(Path path) {
+ Objects.requireNonNull(path, "Path can not be null!");
+ if (!path.isAbsolute()) {
+ path = new Path(workingDir, path);
+ }
+ // removing leading '/' char
+ String key = path.toUri().getPath().substring(1);
+ LOG.trace("path for key:{} is:{}", key, path);
+ return key;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
new file mode 100644
index 0000000..07733e5
--- /dev/null
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.ozone.web.client.OzoneBucket;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
+
+/**
+ * Wraps OzoneInputStream implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class OzoneInputStream extends FSInputStream {
+ private static final Log LOG = LogFactory.getLog(OzoneInputStream.class);
+
+ private final RandomAccessFile in;
+
+ /** Closed bit. Volatile so reads are non-blocking. */
+ private volatile boolean closed = false;
+
+ /** the ozone bucket client. */
+ private final OzoneBucket bucket;
+
+ /** The object key. */
+ private final String key;
+
+ /** Object content length. */
+ private final long contentLen;
+
+ /** file system stats. */
+ private final Statistics stats;
+
+ private final URI keyUri;
+
+ OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
+ String key, long contentLen, int bufferSize, Statistics statistics)
+ throws IOException {
+ Objects.requireNonNull(bucket, "bucket can not be null!");
+ Objects.requireNonNull(key, "kenName can not be null!");
+ this.bucket = bucket;
+ this.key = key;
+ this.contentLen = contentLen;
+ this.stats = statistics;
+ this.keyUri = fsUri.resolve(key);
+
+ if (conf.get(BUFFER_DIR_KEY) == null) {
+ conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
+ }
+ final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+ final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
+ LocalDirAllocator.SIZE_UNKNOWN, conf);
+ try {
+ LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath());
+ bucket.getKey(this.key, tmpFile.toPath());
+ in = new RandomAccessFile(tmpFile, "r");
+ statistics.incrementReadOps(1);
+ } catch (OzoneException oe) {
+ final String msg = "Error when getBytes for key = " + key;
+ LOG.error(msg, oe);
+ throw new IOException(msg, oe);
+ }
+ }
+
+ @Override
+ public synchronized void seek(long targetPos) throws IOException {
+ checkNotClosed();
+ // Do not allow negative seek
+ if (targetPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos);
+ }
+
+ if (this.contentLen <= 0) {
+ return;
+ }
+
+ in.seek(targetPos);
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ checkNotClosed();
+ return in.getFilePointer();
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ int ch = in.read();
+ if (stats != null && ch != -1) {
+ stats.incrementBytesRead(1);
+ }
+ return ch;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(buffer != null, "buffer can not be null");
+ int numberOfByteRead = super.read(position, buffer, offset, length);
+
+ if (stats != null && numberOfByteRead > 0) {
+ stats.incrementBytesRead(numberOfByteRead);
+ }
+ return numberOfByteRead;
+ }
+
+ @Override
+ public synchronized int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(buffer != null, "buffer can not be null");
+ int numberOfByteRead = in.read(buffer, offset, length);
+ if (stats != null && numberOfByteRead > 0) {
+ stats.incrementBytesRead(numberOfByteRead);
+ }
+ return numberOfByteRead;
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ checkNotClosed();
+
+ final long remainingInWrapped = contentLen - in.getFilePointer();
+ return (remainingInWrapped < Integer.MAX_VALUE)
+ ? (int)remainingInWrapped
+ : Integer.MAX_VALUE;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized long skip(long pos) throws IOException {
+ return in.skipBytes((int) pos);
+ }
+
+ /**
+ * Verify that the input stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(this.keyUri + ": "
+ + FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
new file mode 100644
index 0000000..bf93c9e
--- /dev/null
+++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.ozone.web.client.OzoneBucket;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
+
+
+/**
+ * The output stream for Ozone file system.
+ *
+ * Data will be buffered on local disk, then uploaded to Ozone in
+ * {@link #close()} method.
+ *
+ * This class is not thread safe.
+ */
+public class OzoneOutputStream extends OutputStream {
+ private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class);
+ private OzoneBucket bucket;
+ private final String key;
+ private final URI keyUri;
+ private Statistics statistics;
+ private LocalDirAllocator dirAlloc;
+ private boolean closed;
+ private File tmpFile;
+ private BufferedOutputStream backupStream;
+
+ OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
+ String key, Statistics statistics) throws IOException {
+ this.bucket = bucket;
+ this.key = key;
+ this.keyUri = fsUri.resolve(key);
+ this.statistics = statistics;
+
+ if (conf.get(BUFFER_DIR_KEY) == null) {
+ conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
+ }
+ dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+ tmpFile = dirAlloc.createTmpFileForWrite("output-",
+ LocalDirAllocator.SIZE_UNKNOWN, conf);
+ backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
+
+ closed = false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (backupStream != null) {
+ backupStream.close();
+ }
+ try {
+ LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri);
+ bucket.putKey(key, tmpFile);
+ statistics.incrementWriteOps(1);
+ } catch (OzoneException oe) {
+ final String msg = "Uploading error: file=" + tmpFile + ", key=" + key;
+ LOG.error(msg, oe);
+ throw new IOException(msg, oe);
+ } finally {
+ if (!tmpFile.delete()) {
+ LOG.warn("Can not delete tmpFile: " + tmpFile);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ backupStream.flush();
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ backupStream.write(b);
+ statistics.incrementBytesWritten(1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7df79ca/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
index 4697537..24b40d8 100644
--- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
+++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.fs.ozone;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -32,6 +36,7 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
@@ -96,4 +101,26 @@ public class TestOzoneFileInterfaces {
Assert.assertTrue(fs instanceof OzoneFileSystem);
Assert.assertEquals(fs.getUri().getScheme(), Constants.OZONE_URI_SCHEME);
}
+
+ @Test
+ public void testOzFsReadWrite() throws IOException {
+ long currentTime = Time.now();
+ int stringLen = 20;
+ String data = RandomStringUtils.randomAlphanumeric(stringLen);
+ String filePath = RandomStringUtils.randomAlphanumeric(5);
+ Path path = new Path("/" + filePath);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ stream.writeBytes(data);
+ }
+
+ FileStatus status = fs.getFileStatus(path);
+ Assert.assertTrue(status.getModificationTime() < currentTime);
+
+ try (FSDataInputStream inputStream = fs.open(path)) {
+ byte[] buffer = new byte[stringLen];
+ inputStream.readFully(0, buffer);
+ String out = new String(buffer, 0, buffer.length);
+ Assert.assertEquals(out, data);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org