You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/16 02:10:54 UTC

[27/46] hadoop git commit: HDFS-12591. [READ] Implement LevelDBFileRegionFormat. Contributed by Ewan Higgs.

HDFS-12591. [READ] Implement LevelDBFileRegionFormat. Contributed by Ewan Higgs.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b634053c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b634053c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b634053c

Branch: refs/heads/trunk
Commit: b634053c4daec181511abb314aeef0a8fe851086
Parents: 352f994
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Sat Dec 2 12:22:00 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 15 17:51:40 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../impl/LevelDBFileRegionAliasMap.java         | 257 +++++++++++++++++++
 .../impl/TestLevelDBFileRegionAliasMap.java     | 115 +++++++++
 3 files changed, 374 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b634053c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 00976f9..7db0a8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -352,6 +352,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec";
   public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path";
 
+  public static final String DFS_PROVIDED_ALIASMAP_LEVELDB_PATH = "dfs.provided.aliasmap.leveldb.read.path";
+
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b634053c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
new file mode 100644
index 0000000..66971a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/LevelDBFileRegionAliasMap.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
+import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromBlockBytes;
+import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.fromProvidedStorageLocationBytes;
+import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.toProtoBufBytes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LevelDB based implementation of {@link BlockAliasMap}.
+ */
+public class LevelDBFileRegionAliasMap
+      extends BlockAliasMap<FileRegion> implements Configurable {
+
+  private Configuration conf;
+  private LevelDBOptions opts = new LevelDBOptions();
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(LevelDBFileRegionAliasMap.class);
+
+  @Override
+  public void setConf(Configuration conf) {
+    opts.setConf(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Reader<FileRegion> getReader(Reader.Options opts) throws IOException {
+    if (null == opts) {
+      opts = this.opts;
+    }
+    if (!(opts instanceof LevelDBOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    LevelDBOptions o = (LevelDBOptions) opts;
+    return new LevelDBFileRegionAliasMap.LevelDBReader(
+        createDB(o.levelDBPath, false));
+  }
+
+  @Override
+  public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
+    if (null == opts) {
+      opts = this.opts;
+    }
+    if (!(opts instanceof LevelDBOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    LevelDBOptions o = (LevelDBOptions) opts;
+    return new LevelDBFileRegionAliasMap.LevelDBWriter(
+        createDB(o.levelDBPath, true));
+  }
+
+  private static DB createDB(String levelDBPath, boolean createIfMissing)
+      throws IOException {
+    if (levelDBPath == null || levelDBPath.length() == 0) {
+      throw new IllegalArgumentException(
+          "A valid path needs to be specified for "
+              + LevelDBFileRegionAliasMap.class + " using the parameter "
+              + DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
+    }
+    org.iq80.leveldb.Options options = new org.iq80.leveldb.Options();
+    options.createIfMissing(createIfMissing);
+    return factory.open(new File(levelDBPath), options);
+  }
+
+  @Override
+  public void refresh() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Do nothing.
+  }
+
+  /**
+   * Class specifying reader options for the {@link LevelDBFileRegionAliasMap}.
+   */
+  public static class LevelDBOptions implements LevelDBReader.Options,
+      LevelDBWriter.Options, Configurable {
+    private Configuration conf;
+    private String levelDBPath;
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      this.levelDBPath = conf.get(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public LevelDBOptions filename(String levelDBPath) {
+      this.levelDBPath = levelDBPath;
+      return this;
+    }
+  }
+
+  /**
+   * This class is used as a reader for block maps which
+   * are stored as LevelDB files.
+   */
+  public static class LevelDBReader extends Reader<FileRegion> {
+
+    /**
+     * Options for {@link LevelDBReader}.
+     */
+    public interface Options extends Reader.Options {
+      Options filename(String levelDBPath);
+    }
+
+    private DB db;
+
+    LevelDBReader(DB db) {
+      this.db = db;
+    }
+
+    @Override
+    public Optional<FileRegion> resolve(Block block) throws IOException {
+      if (db == null) {
+        return Optional.empty();
+      }
+      // consider layering index w/ composable format
+      byte[] key = toProtoBufBytes(block);
+      byte[] value = db.get(key);
+      ProvidedStorageLocation psl = fromProvidedStorageLocationBytes(value);
+      return Optional.of(new FileRegion(block, psl));
+    }
+
+    static class FRIterator implements Iterator<FileRegion> {
+      private final DBIterator internal;
+
+      FRIterator(DBIterator internal) {
+        this.internal = internal;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return internal.hasNext();
+      }
+
+      @Override
+      public FileRegion next() {
+        Map.Entry<byte[], byte[]> entry = internal.next();
+        if (entry == null) {
+          return null;
+        }
+        try {
+          Block block = fromBlockBytes(entry.getKey());
+          ProvidedStorageLocation psl =
+              fromProvidedStorageLocationBytes(entry.getValue());
+          return new FileRegion(block, psl);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    public Iterator<FileRegion> iterator() {
+      if (db == null) {
+        return null;
+      }
+      DBIterator iterator = db.iterator();
+      iterator.seekToFirst();
+      return new FRIterator(iterator);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (db != null) {
+        db.close();
+      }
+    }
+  }
+
+  /**
+   * This class is used as a writer for block maps which
+   * are stored as LevelDB files.
+   */
+  public static class LevelDBWriter extends Writer<FileRegion> {
+
+    /**
+     * Interface for Writer options.
+     */
+    public interface Options extends Writer.Options {
+      Options filename(String levelDBPath);
+    }
+
+    private final DB db;
+
+    LevelDBWriter(DB db) {
+      this.db = db;
+    }
+
+    @Override
+    public void store(FileRegion token) throws IOException {
+      byte[] key = toProtoBufBytes(token.getBlock());
+      byte[] value = toProtoBufBytes(token.getProvidedStorageLocation());
+      db.put(key, value);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (db != null) {
+        db.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b634053c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java
new file mode 100644
index 0000000..21199e1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestLevelDBFileRegionAliasMap.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests for the {@link LevelDBFileRegionAliasMap}.
+ */
+public class TestLevelDBFileRegionAliasMap {
+
+  /**
+   * A basic test to verify that we can write data and read it back again.
+   * @throws Exception
+   */
+  @Test
+  public void testReadBack() throws Exception {
+    File dbFile = Files.createTempDirectory("fileregionformat")
+        .toFile();
+    try {
+      LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap();
+      LevelDBFileRegionAliasMap.LevelDBOptions opts =
+          new LevelDBFileRegionAliasMap.LevelDBOptions()
+              .filename(dbFile.getAbsolutePath());
+      BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts);
+
+      FileRegion fr = new FileRegion(1, new Path("/file"), 1, 1, 1);
+      writer.store(fr);
+      writer.close();
+
+      BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts);
+      FileRegion fr2 = reader.resolve(new Block(1, 1, 1)).get();
+      assertEquals(fr, fr2);
+      reader.close();
+    } finally {
+      dbFile.delete();
+    }
+  }
+
+  @Test
+  /**
+   * A basic test to verify that we can read a bunch of data that we've written.
+   */
+  public void testIterate() throws Exception {
+    FileRegion[] regions = new FileRegion[10];
+    regions[0] = new FileRegion(1, new Path("/file1"), 0, 1024, 1);
+    regions[1] = new FileRegion(2, new Path("/file1"), 1024, 1024, 1);
+    regions[2] = new FileRegion(3, new Path("/file1"), 2048, 1024, 1);
+    regions[3] = new FileRegion(4, new Path("/file2"), 0, 1024, 1);
+    regions[4] = new FileRegion(5, new Path("/file2"), 1024, 1024, 1);
+    regions[5] = new FileRegion(6, new Path("/file2"), 2048, 1024, 1);
+    regions[6] = new FileRegion(7, new Path("/file2"), 3072, 1024, 1);
+    regions[7] = new FileRegion(8, new Path("/file3"), 0, 1024, 1);
+    regions[8] = new FileRegion(9, new Path("/file4"), 0, 1024, 1);
+    regions[9] = new FileRegion(10, new Path("/file5"), 0, 1024,  1);
+    File dbFile = Files.createTempDirectory("fileregionformat")
+        .toFile();
+    try {
+      LevelDBFileRegionAliasMap frf = new LevelDBFileRegionAliasMap();
+      LevelDBFileRegionAliasMap.LevelDBOptions opts =
+          new LevelDBFileRegionAliasMap.LevelDBOptions()
+              .filename(dbFile.getAbsolutePath());
+      BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts);
+
+      for (FileRegion fr : regions) {
+        writer.store(fr);
+      }
+      writer.close();
+
+      BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts);
+      Iterator<FileRegion> it = reader.iterator();
+      int last = -1;
+      int count = 0;
+      while(it.hasNext()) {
+        FileRegion fr = it.next();
+        int blockId = (int)fr.getBlock().getBlockId();
+        assertEquals(regions[blockId-1], fr);
+        assertNotEquals(blockId, last);
+        last = blockId;
+        count++;
+      }
+      assertEquals(count, 10);
+
+      reader.close();
+    } finally {
+      dbFile.delete();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org