You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2015/08/17 08:29:11 UTC
incubator-geode git commit: Add index region implementation
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-11 670ce25c7 -> a376120dd
Add index region implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a376120d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a376120d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a376120d
Branch: refs/heads/feature/GEODE-11
Commit: a376120dd437387e0e53db491afecbcfa00db7af
Parents: 670ce25
Author: zhouxh <gz...@pivotal.io>
Authored: Sun Aug 16 23:26:28 2015 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Sun Aug 16 23:26:28 2015 -0700
----------------------------------------------------------------------
.../gemfire/cache/lucene/internal/ChunkKey.java | 66 +++++++
.../gemfire/cache/lucene/internal/File.java | 74 +++++++
.../cache/lucene/internal/FileInputStream.java | 97 +++++++++
.../cache/lucene/internal/FileOutputStream.java | 70 +++++++
.../cache/lucene/internal/FileSystem.java | 111 +++++++++++
.../cache/lucene/internal/RegionDirectory.java | 195 +++++++++++++++++++
6 files changed, 613 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
new file mode 100644
index 0000000..86d9bcb
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/ChunkKey.java
@@ -0,0 +1,66 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.Serializable;
+
+public class ChunkKey implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ String fileName;
+ int chunkId;
+
+ ChunkKey(String fileName, int chunkId) {
+ this.fileName = fileName;
+ this.chunkId = chunkId;
+ }
+
+ /**
+ * @return the fileName
+ */
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * @return the chunkId
+ */
+ public int getChunkId() {
+ return chunkId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + fileName.hashCode();
+ result = prime * result + chunkId;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ChunkKey)) {
+ return false;
+ }
+ ChunkKey other = (ChunkKey) obj;
+ if (chunkId != other.chunkId) {
+ return false;
+ }
+ if (fileName == null) {
+ if (other.fileName != null) {
+ return false;
+ }
+ } else if (!fileName.equals(other.fileName)) {
+ return false;
+ }
+ return true;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
new file mode 100644
index 0000000..9df6641
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/File.java
@@ -0,0 +1,74 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+final public class File implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private transient FileSystem fileSystem;
+ private transient int chunkSize;
+
+ String name;
+ long length = 0;
+ int chunks = 0;
+ long created = System.currentTimeMillis();
+ long modified = created;
+
+ File(final FileSystem fileSystem, final String name) {
+ setFileSystem(fileSystem);
+
+ this.name = name;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return the length
+ */
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * @return the created
+ */
+ public long getCreated() {
+ return created;
+ }
+
+ /**
+ * @return the modified
+ */
+ public long getModified() {
+ return modified;
+ }
+
+ public InputStream getInputStream() {
+ // TODO get read lock?
+ return new FileInputStream(this);
+ }
+
+ public OutputStream getOutputStream() {
+ return new FileOutputStream(this);
+ }
+
+ void setFileSystem(final FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ this.chunkSize = fileSystem.chunkSize;
+ }
+
+ int getChunkSize() {
+ return chunkSize;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
new file mode 100644
index 0000000..0565974
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileInputStream.java
@@ -0,0 +1,97 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+final class FileInputStream extends InputStream {
+
+ private final File file;
+ private byte[] chunk = null;
+ private int chunkPosition = 0;
+ private int chunkId = 0;
+ private boolean open = true;
+
+ public FileInputStream(File file) {
+ this.file = file;
+ nextChunk();
+ }
+
+ @Override
+ public int read() throws IOException {
+ assertOpen();
+
+ checkAndFetchNextChunk();
+
+ if (null == chunk) {
+ return -1;
+ }
+
+ return chunk[chunkPosition++] & 0xff;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ assertOpen();
+
+ checkAndFetchNextChunk();
+
+ if (null == chunk) {
+ return -1;
+ }
+
+ int read = 0;
+ while (len > 0) {
+ final int min = Math.min(remaining(), len);
+ System.arraycopy(chunk, chunkPosition, b, off, min);
+ off += min;
+ len -= min;
+ chunkPosition += min;
+ read += min;
+
+ if (len > 0) {
+ // we read to the end of the chunk, fetch another.
+ nextChunk();
+ if (null == chunk) {
+ break;
+ }
+ }
+ }
+
+ return read;
+ }
+
+ @Override
+ public int available() throws IOException {
+ assertOpen();
+
+ return remaining();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (open) {
+ open = false;
+ }
+ }
+
+ private int remaining() {
+ return chunk.length - chunkPosition;
+ }
+
+ private void checkAndFetchNextChunk() {
+ if (null != chunk && remaining() <= 0) {
+ nextChunk();
+ }
+ }
+
+ private void nextChunk() {
+ chunk = file.getFileSystem().getChunk(this.file, chunkId++);
+ chunkPosition = 0;
+ }
+
+ private void assertOpen() throws IOException {
+ if (!open) {
+ throw new IOException("Closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
new file mode 100644
index 0000000..58d9572
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileOutputStream.java
@@ -0,0 +1,70 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+final class FileOutputStream extends OutputStream {
+
+ private final File file;
+ private ByteBuffer buffer;
+ private boolean open = true;
+
+ public FileOutputStream(final File file) {
+ this.file = file;
+ buffer = ByteBuffer.allocate(file.getChunkSize());
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ assertOpen();
+
+ if (buffer.remaining() == 0) {
+ flushBuffer();
+ }
+
+ buffer.put((byte) b);
+ file.length++;
+ }
+
+ @Override
+ public void write(final byte[] b, int off, int len) throws IOException {
+ assertOpen();
+
+ while (len > 0) {
+ if (buffer.remaining() == 0) {
+ flushBuffer();
+ }
+
+ final int min = Math.min(buffer.remaining(), len);
+ buffer.put(b, off, min);
+ off += min;
+ len -= min;
+ file.length += min;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (open) {
+ flushBuffer();
+ file.modified = System.currentTimeMillis();
+ file.getFileSystem().updateFile(file);
+ open = false;
+ buffer = null;
+ }
+ }
+
+ private void flushBuffer() {
+ byte[] chunk = Arrays.copyOfRange(buffer.array(), buffer.arrayOffset(), buffer.position());
+ file.getFileSystem().putChunk(file, file.chunks++, chunk);
+ buffer.rewind();
+ }
+
+ private void assertOpen() throws IOException {
+ if (!open) {
+ throw new IOException("Closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
new file mode 100644
index 0000000..8bba0df
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/FileSystem.java
@@ -0,0 +1,111 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+
+
+import com.gemstone.gemfire.cache.Region;
+
+public class FileSystem {
+ // private final Cache cache;
+ private final Region<String, File> fileRegion;
+ private final Region<ChunkKey, byte[]> chunkRegion;
+
+ final int chunkSize = 1_000_000;
+
+ public FileSystem(Region<String, File> fileRegion, Region<ChunkKey, byte[]> chunkRegion) {
+ super();
+ this.fileRegion = fileRegion;
+ this.chunkRegion = chunkRegion;
+ }
+
+ public Collection<String> listFileNames() {
+ return fileRegion.keySet();
+ }
+
+ public File createFile(final String name) throws IOException {
+ // TODO lock region ?
+ final File file = new File(this, name);
+ if (null != fileRegion.putIfAbsent(name, file)) {
+ throw new IOException("File exists.");
+ }
+ // TODO unlock region ?
+ return file;
+ }
+
+ public File getFile(final String name) throws FileNotFoundException {
+ final File file = fileRegion.get(name);
+
+ if (null == file) {
+ throw new FileNotFoundException(name);
+ }
+
+ file.setFileSystem(this);
+ return file;
+ }
+
+ public void deleteFile(final String name) {
+ // TODO locks?
+
+ // TODO consider removeAll with all ChunkKeys listed.
+ final ChunkKey key = new ChunkKey(name, 0);
+ while (true) {
+ // TODO consider mutable ChunkKey
+ if (null == chunkRegion.remove(key)) {
+ // no more chunks
+ break;
+ }
+ key.chunkId++;
+ }
+
+ fileRegion.remove(name);
+ }
+
+ public void renameFile(String source, String dest) throws IOException {
+ final File destFile = createFile(dest);
+
+ final File sourceFile = fileRegion.remove(source);
+ if (null == sourceFile) {
+ throw new FileNotFoundException(source);
+ }
+
+ destFile.chunks = sourceFile.chunks;
+ destFile.created = sourceFile.created;
+ destFile.length = sourceFile.length;
+ destFile.modified = sourceFile.modified;
+
+ // TODO copy on write?
+ final ChunkKey sourceKey = new ChunkKey(source, 0);
+ while (true) {
+ byte[] chunk = chunkRegion.remove(sourceKey);
+ if (null == chunk) {
+ // no more chunks
+ break;
+ }
+ putChunk(destFile, sourceKey.chunkId, chunk);
+ sourceKey.chunkId++;
+ }
+
+ updateFile(destFile);
+ }
+
+ byte[] getChunk(final File file, final int id) {
+ final ChunkKey key = new ChunkKey(file.getName(), id);
+ final byte[] chunk = chunkRegion.get(key);
+ return chunk;
+ }
+
+ public void putChunk(final File file, final int id, final byte[] chunk) {
+ final ChunkKey key = new ChunkKey(file.getName(), id);
+ chunkRegion.put(key, chunk);
+ }
+
+ void updateFile(File file) {
+ fileRegion.put(file.getName(), file);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a376120d/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
new file mode 100644
index 0000000..13700a7
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RegionDirectory.java
@@ -0,0 +1,195 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.store.BaseDirectory;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.store.SingleInstanceLockFactory;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class RegionDirectory extends BaseDirectory {
+
+ private static final class FileIndexInput extends BufferedIndexInput {
+ private final File file;
+ InputStream in;
+
+ private FileIndexInput(String resourceDesc, File file) {
+ super(resourceDesc);
+ this.file = file;
+ in = file.getInputStream();
+ }
+
+ @Override
+ public long length() {
+ return file.getLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ protected void seekInternal(long pos) throws IOException {
+ in.close();
+ in = file.getInputStream();
+ in.skip(pos);
+ }
+
+ @Override
+ protected void readInternal(byte[] b, int offset, int length) throws IOException {
+ in.read(b, offset, length);
+ }
+ }
+
+ static private final AtomicInteger cacheCount = new AtomicInteger();
+ static private final boolean CREATE_CACHE = Boolean.getBoolean("lucene.createCache");
+ private static final Logger logger = LogService.getLogger();
+
+ private Cache cache;
+ private final FileSystem fs;
+
+ /**
+ * Create RegionDirectory to save index documents in file format into Gemfire region.
+ * @param dataRegionName data region's full name to build index from
+ */
+ public RegionDirectory(String dataRegionName) {
+ super(new SingleInstanceLockFactory());
+
+ Cache cache = null;
+ try {
+ cache = CacheFactory.getAnyInstance();
+ } catch (Exception e) {
+ //ignore
+ }
+ if (null == cache) {
+ if (CREATE_CACHE) {
+ cache = new CacheFactory().set("mcast-port", "0").set("log-level", "error").create();
+ logger.info("Created cache in RegionDirectory");
+ } else {
+ throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
+ }
+ } else {
+ logger.info("Found cache in RegionDirectory");
+ }
+ this.cache = cache;
+ assert this.cache != null;
+ cacheCount.incrementAndGet();
+
+ Region dataRegion = cache.getRegion(dataRegionName);
+ assert dataRegion != null;
+ RegionAttributes ra = dataRegion.getAttributes();
+ DataPolicy dp = ra.getDataPolicy();
+ final boolean isPartitionedRegion = (ra.getPartitionAttributes() == null) ? false : true;
+ final boolean withPersistence = dp.withPersistence();
+ final boolean withStorage = isPartitionedRegion?ra.getPartitionAttributes().getLocalMaxMemory()>0:dp.withStorage();
+ RegionShortcut regionShortCut;
+ if (isPartitionedRegion) {
+ if (withPersistence) {
+ regionShortCut = RegionShortcut.PARTITION_PERSISTENT;
+ } else {
+ regionShortCut = RegionShortcut.PARTITION;
+ }
+ } else {
+ if (withPersistence) {
+ regionShortCut = RegionShortcut.REPLICATE_PERSISTENT;
+ } else {
+ regionShortCut = RegionShortcut.REPLICATE;
+ }
+ }
+
+// final boolean isOffHeap = ra.getOffHeap();
+ // TODO: 1) dataRegion should be withStorage
+ // 2) Persistence to Persistence
+ // 3) Replicate to Replicate, Partition To Partition
+ // 4) Offheap to Offheap
+ if (!withStorage) {
+ throw new IllegalStateException("The data region to create lucene index should be with storage");
+ }
+
+ final String fileRegionName = dataRegionName+".files";
+ Region<String, File> fileRegion = cache.<String, File> getRegion(fileRegionName);
+ if (null == fileRegion) {
+ fileRegion = cache.<String, File> createRegionFactory(regionShortCut)
+ .setPartitionAttributes(new PartitionAttributesFactory<String, File>().setColocatedWith(dataRegionName).create())
+ .create(fileRegionName);
+ }
+
+ final String chunkRegionName = dataRegionName + ".chunks";
+ Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, byte[]> getRegion(chunkRegionName);
+ if (null == chunkRegion) {
+ chunkRegion = cache.<ChunkKey, byte[]> createRegionFactory(regionShortCut)
+ .setPartitionAttributes(new PartitionAttributesFactory<ChunkKey, byte[]>().setColocatedWith(fileRegion.getFullPath()).create())
+ .create(chunkRegionName);
+ }
+
+ fs = new FileSystem(fileRegion, chunkRegion);
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ return fs.listFileNames().toArray(new String[] {});
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ fs.deleteFile(name);
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ return fs.getFile(name).getLength();
+ }
+
+ @Override
+ public IndexOutput createOutput(final String name, final IOContext context) throws IOException {
+ final File file = fs.createFile(name);
+ final OutputStream out = file.getOutputStream();
+
+ return new OutputStreamIndexOutput(name, out, 1000);
+ }
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+ // Region does not need to sync to disk
+ }
+
+ @Override
+ public void renameFile(String source, String dest) throws IOException {
+ fs.renameFile(source, dest);
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ final File file = fs.getFile(name);
+
+ return new FileIndexInput(name, file);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (0 == cacheCount.decrementAndGet()) {
+ cache.close();
+ }
+ isOpen = false;
+ }
+
+}