You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:33 UTC
[45/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-lucene/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-lucene/src/main/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java
new file mode 100644
index 0000000..48a2ca9
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntries.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Holds a ordered collection of entries matching a search query.
+ *
+ * @param <K> the type of key
+ */
+public class TopEntries<K> implements DataSerializableFixedID {
+ // ordered collection of entries
+ private List<EntryScore<K>> hits = new ArrayList<>();
+
+ // the maximum number of entries stored in this
+ private int limit;
+
+ // comparator to order entryScore instances
+ final Comparator<EntryScore<K>> comparator = new EntryScoreComparator();
+
+ public TopEntries() {
+ this(LuceneQueryFactory.DEFAULT_LIMIT);
+ }
+
+ public TopEntries(int limit) {
+ if (limit < 0) {
+ throw new IllegalArgumentException();
+ }
+ this.limit = limit;
+ }
+
+ /**
+ * Adds an entry to the collection. The new entry must have a lower score than all previous entries added to the
+ * collection. The new entry will be ignored if the limit is already reached.
+ *
+ * @param entry
+ */
+ public void addHit(EntryScore<K> entry) {
+ if (hits.size() > 0) {
+ EntryScore lastEntry = hits.get(hits.size() - 1);
+ if (comparator.compare(lastEntry, entry) < 0) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ if (hits.size() >= limit) {
+ return;
+ }
+
+ hits.add(entry);
+ }
+
+ /**
+ * @return count of entries in the collection
+ */
+ public int size() {
+ return hits.size();
+ }
+
+ /**
+ * @return The entries collection managed by this instance
+ */
+ public List<EntryScore<K>> getHits() {
+ return hits;
+ }
+
+ /**
+ * @return The maximum capacity of this collection
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Compares scores of two entries using natural ordering. I.e. it returns -1 if the first entry's score is less than
+ * the second one.
+ */
+ class EntryScoreComparator implements Comparator<EntryScore<K>> {
+ @Override
+ public int compare(EntryScore<K> o1, EntryScore<K> o2) {
+ return Float.compare(o1.getScore(), o2.getScore());
+ }
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return LUCENE_TOP_ENTRIES;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ out.writeInt(limit);
+ DataSerializer.writeObject(hits, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ limit = in.readInt();
+ hits = DataSerializer.readObject(in);
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java
new file mode 100644
index 0000000..94b8a3a
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollector.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * An implementation of {@link IndexResultCollector} to collect {@link EntryScore}. It is expected that the results will
+ * be ordered by score of the entry.
+ */
+public class TopEntriesCollector implements IndexResultCollector, DataSerializableFixedID {
+ private String name;
+
+ private TopEntries entries;
+
+ public TopEntriesCollector() {
+ this(null);
+ }
+
+ public TopEntriesCollector(String name) {
+ this(name, LuceneQueryFactory.DEFAULT_LIMIT);
+ }
+
+ public TopEntriesCollector(String name, int limit) {
+ this.name = name;
+ this.entries = new TopEntries(limit);
+ }
+
+ @Override
+ public void collect(Object key, float score) {
+ collect(new EntryScore(key, score));
+ }
+
+ public void collect(EntryScore entry) {
+ entries.addHit(entry);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public int size() {
+ TopEntries entries = getEntries();
+ return entries == null ? 0 : entries.size();
+ }
+
+ /**
+ * @return The entries collected by this collector
+ */
+ public TopEntries getEntries() {
+ return entries;
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return LUCENE_TOP_ENTRIES_COLLECTOR;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeString(name, out);
+ DataSerializer.writeObject(entries, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ name = DataSerializer.readString(in);
+ entries = DataSerializer.readObject(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
new file mode 100644
index 0000000..cf6e420
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntries.EntryScoreComparator;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to
+ * collect top matching entries from local buckets
+ */
+public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCollector>, DataSerializableFixedID {
+ private static final Logger logger = LogService.getLogger();
+
+ private int limit;
+ private String id;
+
+ public TopEntriesCollectorManager() {
+ this(null, 0);
+ }
+
+ public TopEntriesCollectorManager(String id) {
+ this(id, 0);
+ }
+
+ public TopEntriesCollectorManager(String id, int resultLimit) {
+ this.limit = resultLimit <= 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit;
+ this.id = id == null ? String.valueOf(this.hashCode()) : id;
+ logger.debug("Max count of entries to be produced by {} is {}", id, limit);
+ }
+
+ @Override
+ public TopEntriesCollector newCollector(String name) {
+ return new TopEntriesCollector(name, limit);
+ }
+
+ @Override
+ public TopEntriesCollector reduce(Collection<TopEntriesCollector> collectors) {
+ TopEntriesCollector mergedResult = new TopEntriesCollector(id, limit);
+ if (collectors.isEmpty()) {
+ return mergedResult;
+ }
+
+ final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator();
+
+ // orders a entry with higher score above a doc with lower score
+ Comparator<ListScanner> entryListComparator = new Comparator<ListScanner>() {
+ @Override
+ public int compare(ListScanner l1, ListScanner l2) {
+ EntryScore o1 = l1.peek();
+ EntryScore o2 = l2.peek();
+ return scoreComparator.compare(o1, o2);
+ }
+ };
+
+ // The queue contains iterators for all bucket results. The queue puts the entry with the highest score at the head
+ // using score comparator.
+ PriorityQueue<ListScanner> entryListsPriorityQueue;
+ entryListsPriorityQueue = new PriorityQueue<ListScanner>(collectors.size(),
+ Collections.reverseOrder(entryListComparator));
+
+ for (IndexResultCollector collector : collectors) {
+ logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size());
+
+ if (collector.size() > 0) {
+ entryListsPriorityQueue.add(new ListScanner(((TopEntriesCollector) collector).getEntries().getHits()));
+ }
+ }
+
+ logger.debug("Only {} count of entries will be reduced. Other entries will be ignored", limit);
+ while (entryListsPriorityQueue.size() > 0 && limit > mergedResult.size()) {
+
+ ListScanner scanner = entryListsPriorityQueue.remove();
+ EntryScore entry = scanner.next();
+ mergedResult.collect(entry);
+
+ if (scanner.hasNext()) {
+ entryListsPriorityQueue.add(scanner);
+ }
+ }
+
+ logger.debug("Reduced size of {} is {}", mergedResult.getName(), mergedResult.size());
+ return mergedResult;
+ }
+
+ /*
+ * Utility class to iterate on hits without modifying it
+ */
+ static class ListScanner {
+ private List<EntryScore> hits;
+ private int index = 0;
+
+ ListScanner(List<EntryScore> hits) {
+ this.hits = hits;
+ }
+
+ boolean hasNext() {
+ return index < hits.size();
+ }
+
+ EntryScore peek() {
+ return hits.get(index);
+ }
+
+ EntryScore next() {
+ return hits.get(index++);
+ }
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return LUCENE_TOP_ENTRIES_COLLECTOR_MANAGER;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeString(id, out);
+ out.writeInt(limit);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ id = DataSerializer.readString(in);
+ limit = in.readInt();
+ }
+
+ /**
+ * @return Id of this collector, if any
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * @return Result limit enforced by the collectors created by this manager
+ */
+ public int getLimit() {
+ return limit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
new file mode 100644
index 0000000..4a99bf8
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.lucene.LuceneQuery;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * A {@link ResultCollector} implementation for collecting and ordering {@link TopEntries}. The {@link TopEntries}
+ * objects will be created by members when a {@link LuceneQuery} is executed on the local data hosted by the member. The
+ * member executing this logic must have sufficient space to hold all the {@link EntryScore} documents returned from the
+ * members.
+ *
+ * <p>
+ * This class will perform a lazy merge operation. Merge will take place if the merge {@link ResultCollector#getResult}
+ * is invoked or if the combined result size is more than the limit set. In the later case, merge will be performed
+ * whenever {@link ResultCollector#addResult} is invoked.
+ */
+public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCollector, TopEntries> {
+ // Use this instance to perform reduce operation
+ final CollectorManager<TopEntriesCollector> manager;
+
+ // latch to wait till all results are collected
+ private final CountDownLatch waitForResults = new CountDownLatch(1);
+
+ final String id;
+
+ // Instance of gemfire cache to check status and other utility methods
+ final private GemFireCacheImpl cache;
+ private static final Logger logger = LogService.getLogger();
+
+ private final Collection<TopEntriesCollector> subResults = new ArrayList<>();
+ private TopEntriesCollector mergedResults;
+
+ public TopEntriesFunctionCollector() {
+ this(null);
+ }
+
+ public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context) {
+ this(context, null);
+ }
+
+ public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context, GemFireCacheImpl cache) {
+ this.cache = cache;
+ id = cache == null ? String.valueOf(this.hashCode()) : cache.getName();
+
+ int limit = context == null ? 0 : context.getLimit();
+
+ if (context != null && context.getCollectorManager() != null) {
+ this.manager = context.getCollectorManager();
+ } else {
+ this.manager = new TopEntriesCollectorManager(id, limit);
+ }
+ }
+
+ @Override
+ public TopEntries getResult() throws FunctionException {
+ try {
+ waitForResults.await();
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while waiting for result collection", e);
+ Thread.currentThread().interrupt();
+ if (cache != null) {
+ cache.getCancelCriterion().checkCancelInProgress(e);
+ }
+ throw new FunctionException(e);
+ }
+
+ return aggregateResults();
+ }
+
+ @Override
+ public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException {
+ try {
+ boolean result = waitForResults.await(timeout, unit);
+ if (!result) {
+ throw new FunctionException("Did not receive results from all members within wait time");
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while waiting for result collection", e);
+ Thread.currentThread().interrupt();
+ if (cache != null) {
+ cache.getCancelCriterion().checkCancelInProgress(e);
+ }
+ throw new FunctionException(e);
+ }
+
+ return aggregateResults();
+ }
+
+ private TopEntries aggregateResults() {
+ synchronized (subResults) {
+ if (mergedResults != null) {
+ return mergedResults.getEntries();
+ }
+
+ mergedResults = manager.reduce(subResults);
+ return mergedResults.getEntries();
+ }
+ }
+
+ @Override
+ public void endResults() {
+ synchronized (subResults) {
+ waitForResults.countDown();
+ }
+ }
+
+ @Override
+ public void clearResults() {
+ synchronized (subResults) {
+ if (waitForResults.getCount() == 0) {
+ throw new IllegalStateException("This collector is closed and cannot accept anymore results");
+ }
+
+ subResults.clear();
+ }
+ }
+
+ @Override
+ public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution) {
+ synchronized (subResults) {
+ if (waitForResults.getCount() == 0) {
+ throw new IllegalStateException("This collector is closed and cannot accept anymore results");
+ }
+ subResults.add(resultOfSingleExecution);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
new file mode 100644
index 0000000..4079ad4
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes used for distributing lucene queries to geode nodes. Contains the lucene related functions
+ * like {@link com.gemstone.gemfire.cache.lucene.internal.distributed.LuceneFunction} as well as objects that are
+ * passed between nodes like {@link com.gemstone.gemfire.cache.lucene.internal.distributed.EntryScore}
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java
new file mode 100644
index 0000000..8fbe356
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/ChunkKey.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * The key for a single chunk on a file stored within a region.
+ */
+public class ChunkKey implements DataSerializableFixedID {
+ UUID fileId;
+ int chunkId;
+
+ /**
+ * Constructor used for serialization only.
+ */
+ public ChunkKey() {
+ }
+
+ ChunkKey(UUID fileName, int chunkId) {
+ this.fileId = fileName;
+ this.chunkId = chunkId;
+ }
+
+ /**
+ * @return the fileName
+ */
+ public UUID getFileId() {
+ return fileId;
+ }
+
+ /**
+ * @return the chunkId
+ */
+ public int getChunkId() {
+ return chunkId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + fileId.hashCode();
+ result = prime * result + chunkId;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ChunkKey)) {
+ return false;
+ }
+ ChunkKey other = (ChunkKey) obj;
+ if (chunkId != other.chunkId) {
+ return false;
+ }
+ if (fileId == null) {
+ if (other.fileId != null) {
+ return false;
+ }
+ } else if (!fileId.equals(other.fileId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return LUCENE_CHUNK_KEY;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ out.writeInt(chunkId);
+ out.writeLong(fileId.getMostSignificantBits());
+ out.writeLong(fileId.getLeastSignificantBits());
+ }
+
+ @Override
+ public void fromData(DataInput in)
+ throws IOException, ClassNotFoundException {
+ chunkId = in.readInt();
+ long high = in.readLong();
+ long low = in.readLong();
+ fileId = new UUID(high, low);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java
new file mode 100644
index 0000000..d27717e
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/File.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.UUID;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * A file that is stored in a gemfire region.
+ */
+public class File implements DataSerializableFixedID {
+
+ private transient FileSystem fileSystem;
+ private transient int chunkSize;
+
+ private String name;
+ long length = 0;
+ int chunks = 0;
+ long created = System.currentTimeMillis();
+ long modified = created;
+ UUID id = UUID.randomUUID();
+
+ /**
+ * Constructor for serialization only
+ */
+ public File() {
+ }
+
+ File(final FileSystem fileSystem, final String name) {
+ setFileSystem(fileSystem);
+
+ this.name = name;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return the length
+ */
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * @return the created
+ */
+ public long getCreated() {
+ return created;
+ }
+
+ /**
+ * @return the modified
+ */
+ public long getModified() {
+ return modified;
+ }
+
+ /**
+ * Get an input stream that reads from the beginning the file
+ *
+ * The input stream is not threadsafe
+ */
+ public SeekableInputStream getInputStream() {
+ // TODO get read lock?
+ return new FileInputStream(this);
+ }
+
+ /**
+ * Get an output stream that appends to the end
+ * of the file.
+ */
+ public OutputStream getOutputStream() {
+ return new FileOutputStream(this);
+ }
+
+ void setFileSystem(final FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ this.chunkSize = FileSystem.CHUNK_SIZE;
+ }
+
+ int getChunkSize() {
+ return chunkSize;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+
+ @Override
+ public int getDSFID() {
+ return LUCENE_FILE;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeString(name, out);
+ out.writeLong(length);
+ out.writeInt(chunks);
+ out.writeLong(created);
+ out.writeLong(modified);
+ out.writeLong(id.getMostSignificantBits());
+ out.writeLong(id.getLeastSignificantBits());
+ }
+
+ @Override
+ public void fromData(DataInput in)
+ throws IOException, ClassNotFoundException {
+ name = DataSerializer.readString(in);
+ length = in.readLong();
+ chunks = in.readInt();
+ created = in.readLong();
+ modified = in.readLong();
+ long high = in.readLong();
+ long low = in.readLong();
+ id = new UUID(high, low);
+ }
+
+
+ /**
+ * Export this to a {@link java.io.File}
+ */
+ public void export(final java.io.File exportLocation)
+ {
+ java.io.File targetFile = new java.io.File(exportLocation, getName());
+ try {
+ Files.copy(getInputStream(), targetFile.toPath());
+ }
+ catch (IOException e) {
+ throw new InternalGemFireError("Could not export file " + getName(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java
new file mode 100644
index 0000000..18194aa
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileInputStream.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * An input stream that reads chunks from
+ * a File saved in the region. This input stream
+ * will keep going back to the region to look for
+ * chunks until nothing is found.
+ */
+final class FileInputStream extends SeekableInputStream {
+
+ private final File file;
+ private byte[] chunk = null;
+ private int chunkPosition = 0;
+ private int chunkId = 0;
+ private boolean open = true;
+
+ public FileInputStream(File file) {
+ this.file = file;
+ nextChunk();
+ }
+
+ public FileInputStream(FileInputStream other) {
+ this.file = other.file;
+ this.chunk = other.chunk;
+ this.chunkId = other.chunkId;
+ this.chunkPosition = other.chunkPosition;
+ this.open = other.open;
+ }
+
+ @Override
+ public int read() throws IOException {
+ assertOpen();
+
+ checkAndFetchNextChunk();
+
+ if (null == chunk) {
+ return -1;
+ }
+
+ return chunk[chunkPosition++] & 0xff;
+ }
+
+ @Override
+ public void seek(long position) throws IOException {
+ if(position > file.length) {
+ throw new EOFException();
+ }
+ int targetChunk = (int) (position / file.getChunkSize());
+ int targetPosition = (int) (position % file.getChunkSize());
+
+ if(targetChunk != (this.chunkId - 1)) {
+ chunk = file.getFileSystem().getChunk(this.file, targetChunk);
+ chunkId = targetChunk + 1;
+ chunkPosition = targetPosition;
+ } else {
+ chunkPosition = targetPosition;
+ }
+ }
+
+
+
+ @Override
+ public long skip(long n) throws IOException {
+ int currentPosition = (chunkId - 1) * file.getChunkSize() + chunkPosition;
+ seek(currentPosition + n);
+ return n;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ seek(0);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ assertOpen();
+
+ checkAndFetchNextChunk();
+
+ if (null == chunk) {
+ return -1;
+ }
+
+ int read = 0;
+ while (len > 0) {
+ final int min = Math.min(remaining(), len);
+ System.arraycopy(chunk, chunkPosition, b, off, min);
+ off += min;
+ len -= min;
+ chunkPosition += min;
+ read += min;
+
+ if (len > 0) {
+ // we read to the end of the chunk, fetch another.
+ nextChunk();
+ if (null == chunk) {
+ break;
+ }
+ }
+ }
+
+ return read;
+ }
+
+ @Override
+ public int available() throws IOException {
+ assertOpen();
+
+ return remaining();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (open) {
+ open = false;
+ }
+ }
+
+ private int remaining() {
+ return chunk.length - chunkPosition;
+ }
+
+ private void checkAndFetchNextChunk() {
+ if (null != chunk && remaining() <= 0) {
+ nextChunk();
+ }
+ }
+
+ private void nextChunk() {
+ chunk = file.getFileSystem().getChunk(this.file, chunkId++);
+ chunkPosition = 0;
+ }
+
+ private void assertOpen() throws IOException {
+ if (!open) {
+ throw new IOException("Closed");
+ }
+ }
+
+ @Override
+ public FileInputStream clone() {
+ return new FileInputStream(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java
new file mode 100644
index 0000000..3f9f614
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileOutputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+final class FileOutputStream extends OutputStream {
+
+ private final File file;
+ private ByteBuffer buffer;
+ private boolean open = true;
+ private long length;
+ private int chunks;
+
+ public FileOutputStream(final File file) {
+ this.file = file;
+ buffer = ByteBuffer.allocate(file.getChunkSize());
+ this.length = file.length;
+ this.chunks = file.chunks;
+ if(chunks > 0 && file.length % file.getChunkSize() != 0) {
+ //If the last chunk was incomplete, we're going to update it
+ //rather than add a new chunk. This guarantees that all chunks
+ //are full except for the last chunk.
+ chunks--;
+ byte[] previousChunkData = file.getFileSystem().getChunk(file, chunks);
+ buffer.put(previousChunkData);
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ assertOpen();
+
+ if (buffer.remaining() == 0) {
+ flushBuffer();
+ }
+
+ buffer.put((byte) b);
+ length++;
+ }
+
+ @Override
+ public void write(final byte[] b, int off, int len) throws IOException {
+ assertOpen();
+
+ while (len > 0) {
+ if (buffer.remaining() == 0) {
+ flushBuffer();
+ }
+
+ final int min = Math.min(buffer.remaining(), len);
+ buffer.put(b, off, min);
+ off += min;
+ len -= min;
+ length += min;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (open) {
+ flushBuffer();
+ file.modified = System.currentTimeMillis();
+ file.length = length;
+ file.chunks = chunks;
+ file.getFileSystem().updateFile(file);
+ open = false;
+ buffer = null;
+ }
+ }
+
+ private void flushBuffer() {
+ byte[] chunk = Arrays.copyOfRange(buffer.array(), buffer.arrayOffset(), buffer.position());
+ file.getFileSystem().putChunk(file, chunks++, chunk);
+ buffer.rewind();
+ }
+
+ private void assertOpen() throws IOException {
+ if (!open) {
+ throw new IOException("Closed");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
new file mode 100644
index 0000000..5f4fb77
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystem.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A Filesystem like interface that stores file data in geode regions.
+ *
+ * This filesystem is safe for use with multiple threads if the threads are not
+ * modifying the same files. A single file is not safe to modify by multiple
+ * threads, even between different members of the distributed system.
+ *
+ * Changes to a file may not be visible to other members of the system until the
+ * FileOutputStream is closed.
+ *
+ */
+public class FileSystem {
+ // private final Cache cache;
+ private final ConcurrentMap<String, File> fileRegion;
+ private final ConcurrentMap<ChunkKey, byte[]> chunkRegion;
+
+ static final int CHUNK_SIZE = 1024 * 1024; //1 MB
+ private final FileSystemStats stats;
+
+ /**
+ * Create filesystem that will store data in the two provided regions. The fileRegion contains
+ * metadata about the files, and the chunkRegion contains the actual data. If data from either region is missing
+ * or inconsistent, no guarantees are made about what this class will do, so it's best if these regions are colocated
+ * and in the same disk store to ensure the data remains together.
+ * @param fileRegion the region to store metadata about the files
+ * @param chunkRegion the region to store actual file data.
+ */
+ public FileSystem(ConcurrentMap<String, File> fileRegion, ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) {
+ this.fileRegion = fileRegion;
+ this.chunkRegion = chunkRegion;
+ this.stats = stats;
+ }
+
+ public Collection<String> listFileNames() {
+ return fileRegion.keySet();
+ }
+
+ public File createFile(final String name) throws IOException {
+ // TODO lock region ?
+ final File file = new File(this, name);
+ if (null != fileRegion.putIfAbsent(name, file)) {
+ throw new IOException("File exists.");
+ }
+ stats.incFileCreates(1);
+ // TODO unlock region ?
+ return file;
+ }
+
+ public File createTemporaryFile(final String name) throws IOException {
+ final File file = new File(this, name);
+ stats.incTemporaryFileCreates(1);
+ return file;
+ }
+
+ public File getFile(final String name) throws FileNotFoundException {
+ final File file = fileRegion.get(name);
+
+ if (null == file) {
+ throw new FileNotFoundException(name);
+ }
+
+ file.setFileSystem(this);
+ return file;
+ }
+
+ public void deleteFile(final String name) throws FileNotFoundException {
+ // TODO locks?
+
+ // TODO - What is the state of the system if
+ // things crash in the middle of removing this file?
+ // Seems like a file will be left with some
+ // dangling chunks at the end of the file
+ File file = fileRegion.remove(name);
+ if(file == null) {
+ throw new FileNotFoundException(name);
+ }
+
+ // TODO consider removeAll with all ChunkKeys listed.
+ final ChunkKey key = new ChunkKey(file.id, 0);
+ while (true) {
+ // TODO consider mutable ChunkKey
+ if (null == chunkRegion.remove(key)) {
+ // no more chunks
+ break;
+ }
+ key.chunkId++;
+ }
+
+ stats.incFileDeletes(1);
+ }
+
+ public void renameFile(String source, String dest) throws IOException {
+ final File sourceFile = fileRegion.get(source);
+ if (null == sourceFile) {
+ throw new FileNotFoundException(source);
+ }
+
+ final File destFile = createFile(dest);
+
+ destFile.chunks = sourceFile.chunks;
+ destFile.created = sourceFile.created;
+ destFile.length = sourceFile.length;
+ destFile.modified = sourceFile.modified;
+ destFile.id = sourceFile.id;
+ updateFile(destFile);
+
+ // TODO - What is the state of the system if
+ // things crash in the middle of moving this file?
+ // Seems like we will have two files pointing
+ // at the same data
+
+ fileRegion.remove(source);
+
+ stats.incFileRenames(1);
+ }
+
+ byte[] getChunk(final File file, final int id) {
+ final ChunkKey key = new ChunkKey(file.id, id);
+
+ //The file's metadata indicates that this chunk shouldn't
+ //exist. Purge all of the chunks that are larger than the file metadata
+ if(id >= file.chunks) {
+ while(chunkRegion.containsKey(key)) {
+ chunkRegion.remove(key);
+ key.chunkId++;
+ }
+
+ return null;
+ }
+
+ final byte[] chunk = chunkRegion.get(key);
+ stats.incReadBytes(chunk.length);
+ return chunk;
+ }
+
+ public void putChunk(final File file, final int id, final byte[] chunk) {
+ final ChunkKey key = new ChunkKey(file.id, id);
+ chunkRegion.put(key, chunk);
+ stats.incWrittenBytes(chunk.length);
+ }
+
+ void updateFile(File file) {
+ fileRegion.put(file.getName(), file);
+ }
+
+ public ConcurrentMap<String, File> getFileRegion() {
+ return fileRegion;
+ }
+
+ public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() {
+ return chunkRegion;
+ }
+
+ /**
+ * Export all of the files in the filesystem to the provided directory
+ */
+ public void export(final java.io.File exportLocation) {
+
+ listFileNames().stream().forEach(fileName-> {
+ try {
+ getFile(fileName).export(exportLocation);
+ }
+ catch (FileNotFoundException e) {
+ //ignore this, it was concurrently removed
+ }
+
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java
new file mode 100644
index 0000000..e6bbf0d
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/FileSystemStats.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.statistics.StatisticsTypeFactoryImpl;
+
+public class FileSystemStats {
+ private static final StatisticsType statsType;
+ private static final String statsTypeName = "FileSystemStats";
+ private static final String statsTypeDescription = "Statistics about in memory file system implementation";
+
+ private final Statistics stats;
+
+ private static final int readBytesId;
+ private static final int writtenBytesId;
+ private static final int fileCreatesId;
+ private static final int temporaryFileCreatesId;
+ private static final int fileDeletesId;
+ private static final int fileRenamesId;
+ private static final int filesId;
+ private static final int chunksId;
+ private static final int bytesId;
+
+ static {
+ final StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
+ statsType = f.createType(
+ statsTypeName,
+ statsTypeDescription,
+ new StatisticDescriptor[] {
+ f.createLongCounter("readBytes", "Number of bytes written", "bytes"),
+ f.createLongCounter("writtenBytes", "Number of bytes read", "bytes"),
+ f.createIntCounter("fileCreates", "Number of files created", "files"),
+ f.createIntCounter("temporaryFileCreates", "Number of temporary files created", "files"),
+ f.createIntCounter("fileDeletes", "Number of files deleted", "files"),
+ f.createIntCounter("fileRenames", "Number of files renamed", "files"),
+ f.createIntGauge("files", "Number of files on this member", "files"),
+ f.createIntGauge("chunks", "Number of file chunks on this member", "chunks"),
+ f.createLongGauge("bytes", "Number of bytes on this member", "bytes"),
+ }
+ );
+
+ readBytesId = statsType.nameToId("readBytes");
+ writtenBytesId = statsType.nameToId("writtenBytes");
+ fileCreatesId = statsType.nameToId("fileCreates");
+ temporaryFileCreatesId = statsType.nameToId("temporaryFileCreates");
+ fileDeletesId = statsType.nameToId("fileDeletes");
+ fileRenamesId = statsType.nameToId("fileRenames");
+ filesId = statsType.nameToId("files");
+ chunksId = statsType.nameToId("chunks");
+ bytesId = statsType.nameToId("bytes");
+ }
+
+ public FileSystemStats(StatisticsFactory f, String name) {
+ this.stats = f.createAtomicStatistics(statsType, name);
+ }
+
+ public void incReadBytes(int delta) {
+ stats.incLong(readBytesId, delta);
+ }
+
+ public void incWrittenBytes(int delta) {
+ stats.incLong(writtenBytesId, delta);
+ }
+
+ public void incFileCreates(final int delta) {
+ stats.incInt(fileCreatesId,delta);
+ }
+
+ public void incTemporaryFileCreates(final int delta) {
+ stats.incInt(temporaryFileCreatesId, delta);
+ }
+
+ public void incFileDeletes(final int delta) {
+ stats.incInt(fileDeletesId,delta);
+ }
+
+ public void incFileRenames(final int delta) {
+ stats.incInt(fileRenamesId,delta);
+ }
+
+ public void setFileSupplier(IntSupplier supplier) {
+ stats.setIntSupplier(filesId, supplier);
+ }
+
+ public int getFiles() {
+ return stats.getInt(filesId);
+ }
+
+ public void setChunkSupplier(IntSupplier supplier) {
+ stats.setIntSupplier(chunksId, supplier);
+ }
+
+ public int getChunks() {
+ return stats.getInt(chunksId);
+ }
+
+ public void setBytesSupplier(LongSupplier supplier) {
+ stats.setLongSupplier(bytesId, supplier);
+ }
+
+ public long getBytes() {
+ return stats.getLong(bytesId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java
new file mode 100644
index 0000000..e10e0c4
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/SeekableInputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An input stream that supports seeking to a particular position.
+ */
+public abstract class SeekableInputStream extends InputStream {
+
+ /**
+ * Seek to a position in the stream. The position is relative to the beginning
+ * of the stream (in other words, just before the first byte that was ever
+ * read).
+ *
+ * @param position
+ * @throws IOException if the seek goes past the end of the stream
+ */
+ public abstract void seek(long position) throws IOException;
+
+ public abstract SeekableInputStream clone();
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java
new file mode 100644
index 0000000..f8b612c
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/filesystem/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A distributed filesystem implementation that uses a geode region as the underlying storage mechanism.
+ *
+ * Users of this filesystem should interact with the {@link com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystem} class.
+ *
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.filesystem;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java
new file mode 100644
index 0000000..ba9f73b
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexMetrics.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import java.beans.ConstructorProperties;
+
+public class LuceneIndexMetrics {
+
+ private final String regionPath;
+
+ private final String indexName;
+
+ private final int queryExecutions;
+
+ private final long queryExecutionTime;
+
+ private final float queryRate;
+
+ private final long queryRateAverageLatency;
+
+ private final int queryExecutionsInProgress;
+
+ private final long queryExecutionTotalHits;
+
+ private final int updates;
+
+ private final long updateTime;
+
+ private final float updateRate;
+
+ private final long updateRateAverageLatency;
+
+ private final int updatesInProgress;
+
+ private final int commits;
+
+ private final long commitTime;
+
+ private final float commitRate;
+
+ private final long commitRateAverageLatency;
+
+ private final int commitsInProgress;
+
+ private final int documents;
+
+ /**
+ * This constructor is to be used by internal JMX framework only. A user should
+ * not try to create an instance of this class.
+ */
+ @ConstructorProperties( { "regionPath", "indexName", "queryExecutions", "queryExecutionTime", "queryRate",
+ "queryRateAverageLatency", "queryExecutionsInProgress", "queryExecutionTotalHits", "updates",
+ "updateTime", "updateRate", "updateRateAverageLatency", "updatesInProgress", "commits",
+ "commitTime", "commitRate", "commitRateAverageLatency", "commitsInProgress", "documents"
+ })
+ public LuceneIndexMetrics(String regionPath, String indexName, int queryExecutions, long queryExecutionTime,
+ float queryRate, long queryRateAverageLatency, int queryExecutionsInProgress, long queryExecutionTotalHits,
+ int updates, long updateTime, float updateRate, long updateRateAverageLatency, int updatesInProgress,
+ int commits, long commitTime, float commitRate, long commitRateAverageLatency, int commitsInProgress,
+ int documents) {
+ this.regionPath = regionPath;
+ this.indexName = indexName;
+ this.queryExecutions = queryExecutions;
+ this.queryExecutionTime = queryExecutionTime;
+ this.queryRate = queryRate;
+ this.queryRateAverageLatency = queryRateAverageLatency;
+ this.queryExecutionsInProgress = queryExecutionsInProgress;
+ this.queryExecutionTotalHits = queryExecutionTotalHits;
+ this.updates = updates;
+ this.updateTime = updateTime;
+ this.updateRate = updateRate;
+ this.updateRateAverageLatency = updateRateAverageLatency;
+ this.updatesInProgress = updatesInProgress;
+ this.commits = commits;
+ this.commitTime = commitTime;
+ this.commitRate = commitRate;
+ this.commitRateAverageLatency = commitRateAverageLatency;
+ this.commitsInProgress = commitsInProgress;
+ this.documents = documents;
+ }
+
+ public String getRegionPath() {
+ return this.regionPath;
+ }
+
+ public String getIndexName() {
+ return this.indexName;
+ }
+
+ public int getQueryExecutions() {
+ return this.queryExecutions;
+ }
+
+ public long getQueryExecutionTime() {
+ return this.queryExecutionTime;
+ }
+
+ public float getQueryRate() {
+ return this.queryRate;
+ }
+
+ public long getQueryRateAverageLatency() {
+ return this.queryRateAverageLatency;
+ }
+
+ public int getQueryExecutionsInProgress() {
+ return this.queryExecutionsInProgress;
+ }
+
+ public long getQueryExecutionTotalHits() {
+ return this.queryExecutionTotalHits;
+ }
+
+ public int getUpdates() {
+ return this.updates;
+ }
+
+ public long getUpdateTime() {
+ return this.updateTime;
+ }
+
+ public float getUpdateRate() {
+ return this.updateRate;
+ }
+
+ public long getUpdateRateAverageLatency() {
+ return this.updateRateAverageLatency;
+ }
+
+ public int getUpdatesInProgress() {
+ return this.updatesInProgress;
+ }
+
+ public int getCommits() {
+ return this.commits;
+ }
+
+ public long getCommitTime() {
+ return this.commitTime;
+ }
+
+ public float getCommitRate() {
+ return this.commitRate;
+ }
+
+ public long getCommitRateAverageLatency() {
+ return this.commitRateAverageLatency;
+ }
+
+ public int getCommitsInProgress() {
+ return this.commitsInProgress;
+ }
+
+ public int getDocuments() {
+ return documents;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append(getClass().getSimpleName())
+ .append("[")
+ .append("regionPath=")
+ .append(this.regionPath)
+ .append("; indexName=")
+ .append(this.indexName)
+ .append("; queryExecutions=")
+ .append(this.queryExecutions)
+ .append("; queryExecutionTime=")
+ .append(this.queryExecutionTime)
+ .append("; queryRate=")
+ .append(this.queryRate)
+ .append("; queryRateAverageLatency=")
+ .append(this.queryRateAverageLatency)
+ .append("; queryExecutionsInProgress=")
+ .append(this.queryExecutionsInProgress)
+ .append("; queryExecutionTotalHits=")
+ .append(this.queryExecutionTotalHits)
+ .append("; updates=")
+ .append(this.updates)
+ .append("; updateTime=")
+ .append(this.updateTime)
+ .append("; updateRate=")
+ .append(this.updateRate)
+ .append("; updateRateAverageLatency=")
+ .append(this.updateRateAverageLatency)
+ .append("; updatesInProgress=")
+ .append(this.updatesInProgress)
+ .append("; commits=")
+ .append(this.commits)
+ .append("; commitTime=")
+ .append(this.commitTime)
+ .append("; commitRate=")
+ .append(this.commitRate)
+ .append("; commitRateAverageLatency=")
+ .append(this.commitRateAverageLatency)
+ .append("; commitsInProgress=")
+ .append(this.commitsInProgress)
+ .append("; documents=")
+ .append(this.documents)
+ .append("]")
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java
new file mode 100644
index 0000000..30a2659
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneIndexStatsMonitor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
+import com.gemstone.gemfire.management.internal.ManagementStrings;
+import com.gemstone.gemfire.management.internal.beans.stats.MBeanStatsMonitor;
+import com.gemstone.gemfire.management.internal.beans.stats.StatType;
+import com.gemstone.gemfire.management.internal.beans.stats.StatsAverageLatency;
+import com.gemstone.gemfire.management.internal.beans.stats.StatsRate;
+
+public class LuceneIndexStatsMonitor extends MBeanStatsMonitor {
+
+ private StatsRate updateRate;
+
+ private StatsAverageLatency updateRateAverageLatency;
+
+ private StatsRate commitRate;
+
+ private StatsAverageLatency commitRateAverageLatency;
+
+ private StatsRate queryRate;
+
+ private StatsAverageLatency queryRateAverageLatency;
+
+ public static final String LUCENE_SERVICE_MXBEAN_MONITOR_PREFIX = "LuceneServiceMXBeanMonitor_";
+
+ public LuceneIndexStatsMonitor(LuceneIndex index) {
+ super(LUCENE_SERVICE_MXBEAN_MONITOR_PREFIX + index.getRegionPath() + "_" + index.getName());
+ addStatisticsToMonitor(((LuceneIndexImpl) index).getIndexStats().getStats());
+ configureMetrics();
+ }
+
+ private void configureMetrics() {
+ this.queryRate = new StatsRate(StatsKey.QUERIES, StatType.INT_TYPE, this);
+
+ this.updateRate = new StatsRate(StatsKey.UPDATES, StatType.INT_TYPE, this);
+
+ this.commitRate = new StatsRate(StatsKey.COMMITS, StatType.INT_TYPE, this);
+
+ this.queryRateAverageLatency = new StatsAverageLatency(
+ StatsKey.QUERIES, StatType.INT_TYPE, StatsKey.QUERY_TIME, this);
+
+ this.updateRateAverageLatency = new StatsAverageLatency(
+ StatsKey.UPDATES, StatType.INT_TYPE, StatsKey.UPDATE_TIME, this);
+
+ this.commitRateAverageLatency = new StatsAverageLatency(
+ StatsKey.COMMITS, StatType.INT_TYPE, StatsKey.COMMIT_TIME, this);
+ }
+
+ protected LuceneIndexMetrics getIndexMetrics(LuceneIndex index) {
+ int queryExecutions = getStatistic(StatsKey.QUERIES).intValue();
+ long queryExecutionTime = getStatistic(StatsKey.QUERY_TIME).longValue();
+ float queryRateValue = this.queryRate.getRate();
+ long queryRateAverageLatencyValue = this.queryRateAverageLatency.getAverageLatency();
+ int queryExecutionsInProgress = getStatistic(StatsKey.QUERIES_IN_PROGRESS).intValue();
+ long queryExecutionTotalHits = getStatistic(StatsKey.QUERIES_TOTAL_HITS).longValue();
+
+ int updates = getStatistic(StatsKey.UPDATES).intValue();
+ long updateTime = getStatistic(StatsKey.UPDATE_TIME).longValue();
+ float updateRateValue = this.updateRate.getRate();
+ long updateRateAverageLatencyValue = this.updateRateAverageLatency.getAverageLatency();
+ int updatesInProgress = getStatistic(StatsKey.UPDATES_IN_PROGRESS).intValue();
+
+ int commits = getStatistic(StatsKey.COMMITS).intValue();
+ long commitTime = getStatistic(StatsKey.COMMIT_TIME).longValue();
+ float commitRateValue = this.commitRate.getRate();
+ long commitRateAverageLatencyValue = this.commitRateAverageLatency.getAverageLatency();
+ int commitsInProgress = getStatistic(StatsKey.COMMITS_IN_PROGRESS).intValue();
+
+ int documents = getStatistic(StatsKey.DOCUMENTS).intValue();
+
+ return new LuceneIndexMetrics(index.getRegionPath(), index.getName(), queryExecutions, queryExecutionTime,
+ queryRateValue, queryRateAverageLatencyValue, queryExecutionsInProgress, queryExecutionTotalHits,
+ updates, updateTime, updateRateValue, updateRateAverageLatencyValue, updatesInProgress, commits,
+ commitTime, commitRateValue, commitRateAverageLatencyValue, commitsInProgress, documents);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java
new file mode 100644
index 0000000..edbfadc
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceBridge.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LuceneServiceBridge {
+
+ private LuceneService service;
+
+ private Map<String,LuceneIndexStatsMonitor> monitors;
+
+ public LuceneServiceBridge(LuceneService service) {
+ this.service = service;
+ this.monitors = new ConcurrentHashMap<>();
+ }
+
+ public void addIndex(LuceneIndex index) {
+ // Create monitor on the index
+ LuceneIndexStatsMonitor monitor = new LuceneIndexStatsMonitor(index);
+
+ // Register the monitor
+ this.monitors.put(getMonitorKey(index), monitor);
+ }
+
+ public LuceneIndexMetrics[] listIndexMetrics() {
+ Collection<LuceneIndex> indexes = this.service.getAllIndexes();
+ LuceneIndexMetrics[] indexMetrics = new LuceneIndexMetrics[indexes.size()];
+ int i=0;
+ for (LuceneIndex index : this.service.getAllIndexes()) {
+ indexMetrics[i++] = getIndexMetrics((LuceneIndexImpl)index);
+ }
+ return indexMetrics;
+ }
+
+ public LuceneIndexMetrics[] listIndexMetrics(String regionPath) {
+ if(!regionPath.startsWith(Region.SEPARATOR)) {
+ regionPath = Region.SEPARATOR + regionPath;
+ }
+ List<LuceneIndexMetrics> indexMetrics = new ArrayList();
+ for (LuceneIndex index : this.service.getAllIndexes()) {
+ if (index.getRegionPath().equals(regionPath)) {
+ indexMetrics.add(getIndexMetrics((LuceneIndexImpl) index));
+ }
+ }
+ return indexMetrics.toArray(new LuceneIndexMetrics[indexMetrics.size()]);
+ }
+
+ public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName) {
+ LuceneIndexImpl index = (LuceneIndexImpl) this.service.getIndex(indexName, regionPath);
+ return index == null ? null : getIndexMetrics(index);
+ }
+
+ private String getMonitorKey(LuceneIndex index) {
+ return index.getRegionPath() + "_" + index.getName();
+ }
+
+ private LuceneIndexMetrics getIndexMetrics(LuceneIndexImpl index) {
+ LuceneIndexStatsMonitor monitor = this.monitors.get(getMonitorKey(index));
+ return monitor.getIndexMetrics(index);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java
new file mode 100644
index 0000000..4320a9a
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMBean.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
+import com.gemstone.gemfire.management.internal.beans.CacheServiceMBeanBase;
+
+import javax.management.NotificationBroadcasterSupport;
+
+public class LuceneServiceMBean extends NotificationBroadcasterSupport
+ implements LuceneServiceMXBean, CacheServiceMBeanBase {
+
+ private LuceneServiceBridge bridge;
+
+ public LuceneServiceMBean(LuceneService service) {
+ this.bridge = new LuceneServiceBridge(service);
+ }
+
+ @Override
+ public LuceneIndexMetrics[] listIndexMetrics() {
+ return this.bridge.listIndexMetrics();
+ }
+
+ @Override
+ public LuceneIndexMetrics[] listIndexMetrics(String regionPath) {
+ return this.bridge.listIndexMetrics(regionPath);
+ }
+
+ @Override
+ public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName) {
+ return this.bridge.listIndexMetrics(regionPath, indexName);
+ }
+
+ @Override
+ public String getId() {
+ return "LuceneService";
+ }
+
+ @Override
+ public Class getInterfaceClass() {
+ return LuceneServiceMXBean.class;
+ }
+
+ public void addIndex(LuceneIndex index) {
+ this.bridge.addIndex(index);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java
new file mode 100644
index 0000000..e19bc83
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/LuceneServiceMXBean.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import com.gemstone.gemfire.management.internal.security.ResourceOperation;
+import org.apache.geode.security.ResourcePermission.Operation;
+import org.apache.geode.security.ResourcePermission.Resource;
+
+/**
+ * MBean that provides access to the {@link com.gemstone.gemfire.cache.lucene.LuceneService}.
+ */
+@ResourceOperation(resource = Resource.CLUSTER, operation = Operation.READ)
+public interface LuceneServiceMXBean {
+
+ /**
+ * Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
+ * instances defined in this member
+ *
+ * @return an array of LuceneIndexMetrics for the LuceneIndexes defined in this member
+ */
+ public LuceneIndexMetrics[] listIndexMetrics();
+
+ /**
+ * Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
+ * instances defined on the input region in this member
+ *
+ * @param regionPath The full path of the region to retrieve
+ *
+ * @return an array of LuceneIndexMetrics for the LuceneIndex instances defined on the input region
+ * in this member
+ */
+ public LuceneIndexMetrics[] listIndexMetrics(String regionPath);
+
+ /**
+ * Returns a {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
+ * with the input index name defined on the input region in this member.
+ *
+ * @param regionPath The full path of the region to retrieve
+ * @param indexName The name of the index to retrieve
+ *
+ * @return a LuceneIndexMetrics for the LuceneIndex with the input index name defined on the input region
+ * in this member.
+ */
+ public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java
new file mode 100644
index 0000000..f88058a
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/ManagementIndexListener.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.internal.IndexListenerAdapter;
+
+public class ManagementIndexListener extends IndexListenerAdapter {
+
+ private LuceneServiceMBean mbean;
+
+ public ManagementIndexListener(LuceneServiceMBean mbean) {
+ this.mbean = mbean;
+ }
+
+ @Override
+ public void afterIndexCreated(LuceneIndex index) {
+ mbean.addIndex(index);
+ }
+
+ @Override
+ public void beforeIndexDestroyed(LuceneIndex index) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java
new file mode 100644
index 0000000..3438937
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/management/StatsKey.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal.management;
+
+public class StatsKey {
+
+ public static final String UPDATES = "updates";
+ public static final String UPDATE_TIME = "updateTime";
+ public static final String UPDATES_IN_PROGRESS = "updatesInProgress";
+
+ public static final String COMMITS = "commits";
+ public static final String COMMIT_TIME = "commitTime";
+ public static final String COMMITS_IN_PROGRESS = "commitsInProgress";
+
+ public static final String QUERIES = "queryExecutions";
+ public static final String QUERY_TIME = "queryExecutionTime";
+ public static final String QUERIES_IN_PROGRESS = "queryExecutionsInProgress";
+ public static final String QUERIES_TOTAL_HITS = "queryExecutionTotalHits";
+
+ public static final String DOCUMENTS = "documents";
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java
new file mode 100644
index 0000000..22670f3
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Internal lucene classes, not intended to be used directly.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/05e6d966/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java
new file mode 100644
index 0000000..e487884
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepository.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.Region;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Query;
+
+/**
+ * An Repository interface for the writing data to lucene.
+ */
+public interface IndexRepository {
+
+ /**
+ * Create a new entry in the lucene index
+ * @throws IOException
+ */
+ void create(Object key, Object value) throws IOException;
+
+ /**
+ * Update the entries in the lucene index
+ * @throws IOException
+ */
+ void update(Object key, Object value) throws IOException;
+
+ /**
+ * Delete the entries in the lucene index
+ * @throws IOException
+ */
+ void delete(Object key) throws IOException;
+
+ /**
+ * Query the index index repository, passing the results to the collector
+ * Only the documents with the top scores, up to the limit, will be passed
+ * to the collector, in order of score.
+ *
+ * @param query
+ * @param limit the maximum number of hits to return
+ * @param collector the class to aggregate the hits
+ *
+ * @throws IOException
+ */
+ public void query(Query query, int limit, IndexResultCollector collector) throws IOException;
+
+ /**
+ * Commit the changes to all lucene index
+ * @throws IOException
+ */
+ void commit() throws IOException;
+
+ Region<?, ?> getRegion();
+
+ /**
+ * Check to see if this repository is closed due to
+ * underlying resources being closed or destroyed
+ * @return true if this repository is closed.
+ */
+ public boolean isClosed();
+
+ /**
+ * For debugging purposes, return the underlying IndexWriter
+ */
+ IndexWriter getWriter();
+
+ /**
+ * Clean up any resources associated with this index repository.
+ */
+ void cleanup();
+}