You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2015/08/17 08:29:11 UTC

incubator-geode git commit: Add index region implementation

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 670ce25c7 -> a376120dd


Add index region implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a376120d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a376120d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a376120d

Branch: refs/heads/feature/GEODE-11
Commit: a376120dd437387e0e53db491afecbcfa00db7af
Parents: 670ce25
Author: zhouxh <gz...@pivotal.io>
Authored: Sun Aug 16 23:26:28 2015 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Sun Aug 16 23:26:28 2015 -0700

----------------------------------------------------------------------
 .../gemfire/cache/lucene/internal/ChunkKey.java |  66 +++++++
 .../gemfire/cache/lucene/internal/File.java     |  74 +++++++
 .../cache/lucene/internal/FileInputStream.java  |  97 +++++++++
 .../cache/lucene/internal/FileOutputStream.java |  70 +++++++
 .../cache/lucene/internal/FileSystem.java       | 111 +++++++++++
 .../cache/lucene/internal/RegionDirectory.java  | 195 +++++++++++++++++++
 6 files changed, 613 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
new file mode 100644
index 0000000..86d9bcb
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
@@ -0,0 +1,66 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.Serializable;
+
+public class ChunkKey implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  String fileName;
+  int chunkId;
+
+  ChunkKey(String fileName, int chunkId) {
+    this.fileName = fileName;
+    this.chunkId = chunkId;
+  }
+
+  /**
+   * @return the fileName
+   */
+  public String getFileName() {
+    return fileName;
+  }
+
+  /**
+   * @return the chunkId
+   */
+  public int getChunkId() {
+    return chunkId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + fileName.hashCode();
+    result = prime * result + chunkId;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof ChunkKey)) {
+      return false;
+    }
+    ChunkKey other = (ChunkKey) obj;
+    if (chunkId != other.chunkId) {
+      return false;
+    }
+    if (fileName == null) {
+      if (other.fileName != null) {
+        return false;
+      }
+    } else if (!fileName.equals(other.fileName)) {
+      return false;
+    }
+    return true;
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
new file mode 100644
index 0000000..9df6641
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
@@ -0,0 +1,74 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+final public class File implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private transient FileSystem fileSystem;
+  private transient int chunkSize;
+
+  String name;
+  long length = 0;
+  int chunks = 0;
+  long created = System.currentTimeMillis();
+  long modified = created;
+
+  File(final FileSystem fileSystem, final String name) {
+    setFileSystem(fileSystem);
+    
+    this.name = name;
+  }
+
+  /**
+   * @return the name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return the length
+   */
+  public long getLength() {
+    return length;
+  }
+
+  /**
+   * @return the created
+   */
+  public long getCreated() {
+    return created;
+  }
+
+  /**
+   * @return the modified
+   */
+  public long getModified() {
+    return modified;
+  }
+
+  public InputStream getInputStream() {
+    // TODO get read lock?
+    return new FileInputStream(this);
+  }
+
+  public OutputStream getOutputStream() {
+    return new FileOutputStream(this);
+  }
+
+  void setFileSystem(final FileSystem fileSystem) {
+    this.fileSystem = fileSystem;
+    this.chunkSize = fileSystem.chunkSize;
+  }
+
+  int getChunkSize() {
+    return chunkSize;
+  }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
new file mode 100644
index 0000000..0565974
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
@@ -0,0 +1,97 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+final class FileInputStream extends InputStream {
+
+  private final File file;
+  private byte[] chunk = null;
+  private int chunkPosition = 0;
+  private int chunkId = 0;
+  private boolean open = true;
+
+  public FileInputStream(File file) {
+    this.file = file;
+    nextChunk();
+  }
+
+  @Override
+  public int read() throws IOException {
+    assertOpen();
+
+    checkAndFetchNextChunk();
+
+    if (null == chunk) {
+      return -1;
+    }
+
+    return chunk[chunkPosition++] & 0xff;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    assertOpen();
+
+    checkAndFetchNextChunk();
+
+    if (null == chunk) {
+      return -1;
+    }
+
+    int read = 0;
+    while (len > 0) {
+      final int min = Math.min(remaining(), len);
+      System.arraycopy(chunk, chunkPosition, b, off, min);
+      off += min;
+      len -= min;
+      chunkPosition += min;
+      read += min;
+
+      if (len > 0) {
+        // we read to the end of the chunk, fetch another.
+        nextChunk();
+        if (null == chunk) {
+          break;
+        }
+      }
+    }
+
+    return read;
+  }
+
+  @Override
+  public int available() throws IOException {
+    assertOpen();
+
+    return remaining();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (open) {
+      open = false;
+    }
+  }
+
+  private int remaining() {
+    return chunk.length - chunkPosition;
+  }
+
+  private void checkAndFetchNextChunk() {
+    if (null != chunk && remaining() <= 0) {
+      nextChunk();
+    }
+  }
+
+  private void nextChunk() {
+    chunk = file.getFileSystem().getChunk(this.file, chunkId++);
+    chunkPosition = 0;
+  }
+
+  private void assertOpen() throws IOException {
+    if (!open) {
+      throw new IOException("Closed");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
new file mode 100644
index 0000000..58d9572
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
@@ -0,0 +1,70 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+final class FileOutputStream extends OutputStream {
+
+  private final File file;
+  private ByteBuffer buffer;
+  private boolean open = true;
+
+  public FileOutputStream(final File file) {
+    this.file = file;
+    buffer = ByteBuffer.allocate(file.getChunkSize());
+  }
+
+  @Override
+  public void write(final int b) throws IOException {
+    assertOpen();
+    
+    if (buffer.remaining() == 0) {
+      flushBuffer();
+    }
+
+    buffer.put((byte) b);
+    file.length++;
+  }
+  
+  @Override
+  public void write(final byte[] b, int off, int len) throws IOException {
+    assertOpen();
+    
+    while (len > 0) {
+      if (buffer.remaining() == 0) {
+        flushBuffer();
+      }
+
+      final int min = Math.min(buffer.remaining(), len);
+      buffer.put(b, off, min);
+      off += min;
+      len -= min;
+      file.length += min;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (open) {
+      flushBuffer();
+      file.modified = System.currentTimeMillis();
+      file.getFileSystem().updateFile(file);
+      open = false;
+      buffer = null;
+    }
+  }
+
+  private void flushBuffer() {
+    byte[] chunk = Arrays.copyOfRange(buffer.array(), buffer.arrayOffset(), buffer.position());
+    file.getFileSystem().putChunk(file, file.chunks++, chunk);
+    buffer.rewind();
+  }
+
+  private void assertOpen() throws IOException {
+    if (!open) {
+      throw new IOException("Closed");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
new file mode 100644
index 0000000..8bba0df
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
@@ -0,0 +1,111 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+
+
+import com.gemstone.gemfire.cache.Region;
+
+public class FileSystem {
+  // private final Cache cache;
+  private final Region<String, File> fileRegion;
+  private final Region<ChunkKey, byte[]> chunkRegion;
+  
+  final int chunkSize = 1_000_000;
+
+  public FileSystem(Region<String, File> fileRegion, Region<ChunkKey, byte[]> chunkRegion) {
+    super();
+    this.fileRegion = fileRegion;
+    this.chunkRegion = chunkRegion;
+  }
+
+  public Collection<String> listFileNames() {
+    return fileRegion.keySet();
+  }
+
+  public File createFile(final String name) throws IOException {
+    // TODO lock region ?
+    final File file = new File(this, name);
+    if (null != fileRegion.putIfAbsent(name, file)) {
+      throw new IOException("File exists.");
+    }
+    // TODO unlock region ?
+    return file;
+  }
+  
+  public File getFile(final String name) throws FileNotFoundException {
+    final File file = fileRegion.get(name);
+    
+    if (null == file) {
+      throw new FileNotFoundException(name);
+    }
+    
+    file.setFileSystem(this);
+    return file;
+  }
+
+  public void deleteFile(final String name) {
+    // TODO locks?
+
+    // TODO consider removeAll with all ChunkKeys listed.
+    final ChunkKey key = new ChunkKey(name, 0);
+    while (true) {
+      // TODO consider mutable ChunkKey
+      if (null == chunkRegion.remove(key)) {
+        // no more chunks
+        break;
+      }
+      key.chunkId++;
+    }
+    
+    fileRegion.remove(name);
+  }
+
+  public void renameFile(String source, String dest) throws IOException {
+    final File destFile = createFile(dest);
+    
+    final File sourceFile = fileRegion.remove(source);
+    if (null == sourceFile) {
+      throw new FileNotFoundException(source);
+    }
+
+    destFile.chunks = sourceFile.chunks;
+    destFile.created = sourceFile.created;
+    destFile.length = sourceFile.length;
+    destFile.modified = sourceFile.modified;
+
+    // TODO copy on write?
+    final ChunkKey sourceKey = new ChunkKey(source, 0);
+    while (true) {
+      byte[] chunk = chunkRegion.remove(sourceKey);
+      if (null == chunk) {
+        // no more chunks
+        break;
+      }
+      putChunk(destFile, sourceKey.chunkId, chunk);
+      sourceKey.chunkId++;
+    }
+    
+    updateFile(destFile);
+  }
+  
+  byte[] getChunk(final File file, final int id) {
+    final ChunkKey key = new ChunkKey(file.getName(), id);
+    final byte[] chunk = chunkRegion.get(key);
+    return chunk;
+  }
+
+  public void putChunk(final File file, final int id, final byte[] chunk) {
+    final ChunkKey key = new ChunkKey(file.getName(), id);
+    chunkRegion.put(key, chunk);
+  }
+
+  void updateFile(File file) {
+    fileRegion.put(file.getName(), file);
+  }
+
+  
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
new file mode 100644
index 0000000..13700a7
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
@@ -0,0 +1,195 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.BaseDirectory;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.store.SingleInstanceLockFactory;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RegionDirectory extends BaseDirectory {
+
+  private static final class FileIndexInput extends BufferedIndexInput {
+    private final File file;
+    InputStream in;
+
+    private FileIndexInput(String resourceDesc, File file) {
+      super(resourceDesc);
+      this.file = file;
+      in = file.getInputStream();
+    }
+
+    @Override
+    public long length() {
+      return file.getLength();
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+      in.close();
+      in = file.getInputStream();
+      in.skip(pos);
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      in.read(b, offset, length);
+    }
+  }
+
+  static private final AtomicInteger cacheCount = new AtomicInteger();
+  static private final boolean CREATE_CACHE = Boolean.getBoolean("lucene.createCache");
+  private static final Logger logger = LogService.getLogger();
+  
+  private Cache cache;
+  private final FileSystem fs;
+  
+  /**
+   * Create RegionDirectory to save index documents in file format into Gemfire region.
+   * @param dataRegionName data region's full name to build index from
+   */
+  public RegionDirectory(String dataRegionName) {
+    super(new SingleInstanceLockFactory());
+
+    Cache cache = null;
+    try {
+       cache = CacheFactory.getAnyInstance();
+    } catch (Exception e) {
+      //ignore
+    }
+    if (null == cache) {
+      if (CREATE_CACHE) {
+        cache = new CacheFactory().set("mcast-port", "0").set("log-level", "error").create();
+        logger.info("Created cache in RegionDirectory");
+      } else {
+        throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
+      }
+    } else {
+      logger.info("Found cache in RegionDirectory");
+    }
+    this.cache = cache;
+    assert this.cache != null;
+    cacheCount.incrementAndGet();
+    
+    Region dataRegion = cache.getRegion(dataRegionName);
+    assert dataRegion != null;
+    RegionAttributes ra = dataRegion.getAttributes();
+    DataPolicy dp = ra.getDataPolicy();
+    final boolean isPartitionedRegion = (ra.getPartitionAttributes() == null) ? false : true;
+    final boolean withPersistence = dp.withPersistence();
+    final boolean withStorage = isPartitionedRegion?ra.getPartitionAttributes().getLocalMaxMemory()>0:dp.withStorage();
+    RegionShortcut regionShortCut;
+    if (isPartitionedRegion) {
+      if (withPersistence) {
+        regionShortCut = RegionShortcut.PARTITION_PERSISTENT;
+      } else {
+        regionShortCut = RegionShortcut.PARTITION;
+      }
+    } else {
+      if (withPersistence) {
+        regionShortCut = RegionShortcut.REPLICATE_PERSISTENT;
+      } else {
+        regionShortCut = RegionShortcut.REPLICATE;
+      }
+    }
+    
+//    final boolean isOffHeap = ra.getOffHeap();
+    // TODO: 1) dataRegion should be withStorage
+    //       2) Persistence to Persistence
+    //       3) Replicate to Replicate, Partition To Partition
+    //       4) Offheap to Offheap
+    if (!withStorage) {
+      throw new IllegalStateException("The data region to create lucene index should be with storage");
+    }
+    
+    final String fileRegionName = dataRegionName+".files";
+    Region<String, File> fileRegion = cache.<String, File> getRegion(fileRegionName);
+    if (null == fileRegion) {
+      fileRegion = cache.<String, File> createRegionFactory(regionShortCut)
+          .setPartitionAttributes(new PartitionAttributesFactory<String, File>().setColocatedWith(dataRegionName).create())
+          .create(fileRegionName);
+    }
+
+    final String chunkRegionName = dataRegionName + ".chunks";
+    Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]> getRegion(chunkRegionName);
+    if (null == chunkRegion) {
+      chunkRegion = cache.<ChunkKey, byte[]> createRegionFactory(regionShortCut)
+          .setPartitionAttributes(new PartitionAttributesFactory<ChunkKey, byte[]>().setColocatedWith(fileRegion.getFullPath()).create())
+          .create(chunkRegionName);
+    }
+    
+    fs = new FileSystem(fileRegion, chunkRegion);
+  }
+  
+  @Override
+  public String[] listAll() throws IOException {
+    return fs.listFileNames().toArray(new String[] {});
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    fs.deleteFile(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return fs.getFile(name).getLength();
+  }
+
+  @Override
+  public IndexOutput createOutput(final String name, final IOContext context) throws IOException {
+    final File file = fs.createFile(name);
+    final OutputStream out = file.getOutputStream();
+
+    return new OutputStreamIndexOutput(name, out, 1000);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    // Region does not need to sync to disk
+  }
+
+  @Override
+  public void renameFile(String source, String dest) throws IOException {
+    fs.renameFile(source, dest);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    final File file = fs.getFile(name);
+
+    return new FileIndexInput(name, file);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (0 == cacheCount.decrementAndGet()) {
+      cache.close();
+    }
+    isOpen = false;
+  }
+
+}