You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:39 UTC
[27/49] incubator-rya git commit: RYA-7 POM and License Clean-up for
Apache Move
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query-ext/src/test/java/SampleJTSData.java
----------------------------------------------------------------------
diff --git a/partition/common-query-ext/src/test/java/SampleJTSData.java b/partition/common-query-ext/src/test/java/SampleJTSData.java
deleted file mode 100644
index 41df658..0000000
--- a/partition/common-query-ext/src/test/java/SampleJTSData.java
+++ /dev/null
@@ -1,171 +0,0 @@
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.io.Text;
-
-import cloudbase.core.client.BatchWriter;
-import cloudbase.core.client.CBException;
-import cloudbase.core.client.CBSecurityException;
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.Instance;
-import cloudbase.core.client.MultiTableBatchWriter;
-import cloudbase.core.client.TableExistsException;
-import cloudbase.core.client.TableNotFoundException;
-import cloudbase.core.client.mock.MockInstance;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.security.Authorizations;
-
-// For use in testing the Date Filter and Frequency Filter classes
-public class SampleJTSData
-{
-
- public static int NUM_PARTITIONS = 2;
-
-
- public static Connector initConnector()
- {
- Instance instance = new MockInstance();
-
- try
- {
- Connector connector = instance.getConnector("root", "password".getBytes());
-
- // set up table
- connector.tableOperations().create("partition");
-
- // set up root's auths
- connector.securityOperations().changeUserAuthorizations("root", new Authorizations("ALPHA,BETA,GAMMA".split(",")));
-
- return connector;
- }
- catch (CBException e)
- {
- e.printStackTrace();
- }
- catch (CBSecurityException e)
- {
- e.printStackTrace();
- }
- catch (TableExistsException e)
- {
- e.printStackTrace();
- }
-
- return null;
- }
-
- public static Collection<Map<String, String>> sampleData()
- {
- List<Map<String, String>> list = new ArrayList<Map<String, String>>();
- Map<String, String> item;
-
- item = new HashMap<String, String>();
- item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(91.985, -12.108, 94.657, -12.059, 98.486, -11.988, 101.385, -12.296, 102.911, -12.569, 103.93, -12.852, 105.005, -12.531, 106.37, -12.204, 108.446, -11.503, 109.585, -10.88, 110.144, -10.207, 108.609, -9.573, 106.05, -8.535, 104.145, -7.606, 102.191, -7.522, 99.522, -7.691, 97.64, -7.606, 95.482, -7.947, 94.546, -8.084, 92.465, -8.605, 90.554, -9.366, 90.197, -10.436, 89.84, -11.729, 90.554, -12.175, 91.985, -12.108))");
- item.put("beam-name", "OPTUS D1 Ku-BAND NATIONAL A & B AUSTRALIA Downlink");
- list.add(item);
- //This is Australia
- //Points like 22S 135E are in the beam
-
- //This one is like GV
- item = new HashMap<String, String>();
- item.put("beam-name", "AMC 1 Ku-BAND ZONAL NORTH AMERICA Down HV");
- item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(-70.838, 39.967, -70.506, 40.331, -70.698, 41.679, -71.179, 42.401, -71.578, 42.38, -72.994, 42.924, -74.353, 43.242, -75.715, 43.26, -77.318, 42.981, -78.684, 42.774, -80.05, 42.491, -82.005, 42.517, -83.608, 42.312, -84.977, 41.805, -86.58, 41.525, -88.127, 41.02, -89.731, 40.741, -90.905, 41.582, -92.264, 41.9, -93.861, 42.147, -95.411, 41.341, -96.257, 40.076, -97.222, 38.737, -98.011, 37.17, -98.031, 35.593, -97.691, 34.312, -96.875, 33.25, -97.307, 31.904, -97.916, 30.561, -98.702, 29.295, -99.134, 27.949, -98.14, 26.884, -97.205, 25.821, -95.842, 25.803, -94.42, 25.784, -92.876, 26.064, -91.277, 26.043, -90.085, 26.553, -88.729, 26.01, -87.38, 24.941, -86.031, 23.797, -84.616, 23.253, -83.256, 23.01, -81.887, 23.517, -80.866, 24.555, -80.254, 26.124, -79.642, 27.693, -78.444, 28.728, -77.486, 29.542, -76.463, 30.805, -76.088, 32.377, -75.656, 33.723, -76.051,
35.305, -75.442, 36.649, -74.426, 37.386, -73.228, 38.422, -72.032, 39.232, -70.838, 39.967))");
- list.add(item);
- //This is North America
- //Points 39°44'21.00"N 104°59'3.00"W (Denver) are in the footprint
-
- item = new HashMap<String, String>();
- item.put("beam-name", "testa");
- item.put("beam-footprint", "MULTIPOLYGON (((-169.286 40.431, -164.971 39.992, -155.397 38.482, -146.566 36.233, -136.975 32.539, -128.124 27.742, -121.946 24.548, -116.849 21.339, -112.156 17.479, -109.391 14.206, -107.301 11.715, -105.274 9.477, -103.443 8.229, -102.108 7.7, -99.109 7.428, -96.681 7.745, -93.894 8.843, -89.917 11.687, -85.953 15.017, -81.148 17.266, -78.145 17.986, -75.582 17.887, -68.1 17.987, -64.696 18.493, -61.445 19.38, -60.094 20.288, -59.315 21.564, -57.026 26.51, -55.089 30.962, -53.59 33.657, -52.495 34.691, -50.468 36.204, -46.146 38.672, -41.684 40.663, -37.914 42.055, -33.806 43.082, -27.523 44.149, -21.645 44.96, -16.578 45.406, -13.807 45.771, -14.929 50.108, -16.186 53.919, -17.051 56.0, -18.388 58.824, -19.861 61.567, -21.807 64.188, -23.104 65.742, -25.28 67.904, -27.699 69.823, -28.955 70.728, -32.415 72.768, -34.968 73.998, -38.468 75.309, -48.292 73.025, -56.545 71.12, -64.023 70.474, -72.753 70.357, -78.41 70.827, -80.466 71.093, -82.412
71.876, -83.02 72.944, -83.175 74.04, -82.493 74.782, -82.412 75.552, -82.697 76.778, -84.041 78.398, -86.316 81.078, -104.098 80.819, -110.861 80.482, -115.73 80.17, -120.936 79.669, -125.84 79.176, -126.696 79.02, -134.316 77.732, -139.505 76.478, -144.823 74.826, -148.231 73.417, -151.517 71.687, -153.87 70.165, -154.536 69.672, -155.868 68.678, -156.482 68.098, -158.281 66.421, -159.716 64.804, -160.996 63.126, -161.878 61.786, -163.046 59.875, -164.369 57.254, -165.563 54.479, -166.73 51.089, -167.811 47.267, -168.581 44.041, -169.286 40.431)), ((-171.333 23.244, -171.523 18.894, -170.127 18.986, -161.559 18.555, -156.977 18.134, -153.574 18.116, -151.108 18.324, -149.947 18.45, -149.018 18.957, -148.515 19.822, -148.524 20.914, -149.018 21.766, -149.947 22.272, -152.185 23.054, -155.563 23.434, -158.075 23.75, -160.272 24.034, -162.184 24.008, -163.514 23.99, -164.595 23.976, -166.52 23.687, -169.159 23.18, -171.333 23.244)))");
- list.add(item);
-// this point should be in there...
- // -164 40 - somewhere near hawaii
-
- item = new HashMap<String, String>();
- item.put("beam-name", "testb");
- item.put("beam-footprint", "POLYGON ((-140.153 34.772, -140.341 33.272, -137.024 33.026, -132.723 32.369, -130.947 31.916, -128.664 31.225, -125.293 29.612, -121.813 27.871, -118.699 25.892, -115.589 23.79, -112.593 21.875, -109.136 19.335, -106.939 16.701, -105.006 14.97, -104.195 14.407, -103.049 13.659, -100.363 12.717, -98.063 12.288, -94.299 11.612, -90.825 11.097, -87.997 11.584, -86.815 12.109, -86.163 12.893, -85.014 14.342, -83.804 15.788, -82.104 16.998, -80.413 17.269, -78.005 16.574, -76.181 16.531, -74.65 16.68, -73.552 17.392, -72.957 18.3, -72.917 19.651, -73.526 21.325, -74.913 23.018, -76.036 24.519, -76.159 26.428, -75.741 28.447, -74.257 30.072, -72.771 31.331, -70.517 34.328, -69.638 36.04, -68.624 39.467, -68.015 41.851, -67.607 43.501, -67.548 45.528, -67.586 47.308, -68.601 49.066, -69.868 50.07, -71.621 50.778, -73.285 50.888, -74.9 50.926, -76.994 50.975, -79.332 50.846, -81.066 50.887, -83.842 51.136, -86.569 51.016, -87.95 50.864, -90.831 50.563, -94
.27 50.644, -98.068 50.733, -102.937 51.032, -106.455 51.484, -109.973 51.936, -114.119 52.402, -117.363 53.031, -119.899 53.276, -123.243 53.539, -127.017 54.427, -130.519 55.431, -133.643 56.058, -134.826 56.279, -135.354 55.029, -135.792 53.864, -136.168965072136 52.8279962761917, -136.169 52.828, -136.169497186166 52.8264970826432, -136.192 52.763, -136.556548517884 51.6453176911637, -136.703232746756 51.2152965828266, -136.781220290925 50.9919311116929, -136.793 50.959, -136.80274055379 50.9259886895048, -136.992 50.295, -137.200898649547 49.5808675274021, -137.202 49.581, -137.200962495599 49.5806459535167, -137.360714473458 49.0197683891632, -137.459 48.677, -137.462166719028 48.6649126473121, -137.471 48.634, -137.515105536699 48.4619710228524, -137.74710368039 47.5528216167105, -137.793718522461 47.3758260237407, -137.854 47.152, -137.977773277882 46.6610808974241, -138.044 46.403, -138.330834102374 45.1674736036557, -138.365 45.019, -138.38180854655 44.9421315900087, -138.
449801069917 44.6389849661384, -138.485 44.484, -138.497077239724 44.4262941289417, -138.536 44.25, -138.622787032392 43.8206200438395, -138.743816168807 43.232032787661, -138.981390224617 42.0843314825185, -138.989 42.048, -138.990605533614 42.0389442888447, -138.991 42.037, -138.997785044232 41.9994454595406, -139.004 41.969, -139.035645873997 41.7890661698517, -139.061212567475 41.6462082823816, -139.428 39.584, -139.673 38.073, -139.713116752585 37.8001474769807, -139.766 37.457, -139.764942047737 37.4567768906428, -139.898 36.573, -139.897723683259 36.5729429963606, -139.986 35.994, -140.04777653037 35.5462970502163, -140.094 35.232, -140.090797568766 35.2315144621917, -140.153 34.772))");
- list.add(item);
-
-
-
- //London is in niether - 51°30'0.00"N 0° 7'0.00"W
- return list;
- }
-
-
- public static void writeDenSerialized(Connector connector, Collection<Map<String, String>> data)
- {
- // write sample data
- MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1);
- try
- {
- BatchWriter writer;
- if (mtbw != null)
- {
- writer = mtbw.getBatchWriter("partition");
- }
- else
- {
- writer = connector.createBatchWriter("partition", 200000, 10000, 1);
- }
- int count = 0;
- Mutation m;
- for (Map<String, String> object : data)
- {
- count++;
- String id = (count < 10 ? "0" + count : "" + count);
- Text partition = new Text("" + (count % NUM_PARTITIONS));
-
- StringBuilder value = new StringBuilder();
- boolean first = true;
- for (Entry<String, String> entry : object.entrySet())
- {
- if (!first)
- {
- value.append("\u0000");
- }
- else
- {
- first = false;
- }
- value.append(entry.getKey());
- value.append("\uFFFD");
- value.append(entry.getValue());
-
- // write the general index mutation
- m = new Mutation(partition);
- m.put("index", entry.getValue() + "\u0000" + id, "");
- writer.addMutation(m);
-
- // write the specific index mutation
- m = new Mutation(partition);
- m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, "");
- writer.addMutation(m);
- }
-
- // write the event mutation
- m = new Mutation(partition);
- m.put("event", id, value.toString());
- writer.addMutation(m);
- }
- writer.close();
- }
- catch (CBException e)
- {
- e.printStackTrace();
- }
- catch (CBSecurityException e)
- {
- e.printStackTrace();
- }
- catch (TableNotFoundException e)
- {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/pom.xml
----------------------------------------------------------------------
diff --git a/partition/common-query/pom.xml b/partition/common-query/pom.xml
deleted file mode 100644
index 6db84bf..0000000
--- a/partition/common-query/pom.xml
+++ /dev/null
@@ -1,103 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <!--<parent>-->
- <!--<groupId>sitestore</groupId>-->
- <!--<artifactId>sitestore</artifactId>-->
- <!--<version>2.0.0-SNAPSHOT</version>-->
- <!--</parent>-->
-
- <parent>
- <groupId>mvm.rya</groupId>
- <artifactId>parent</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <groupId>sitestore.common</groupId>
- <artifactId>common-query</artifactId>
- <name>common-query (${project.version})</name>
- <version>2.0.0-SNAPSHOT</version>
- <description>A set of filters and iterators for cloudbase queries</description>
-
- <properties>
- <skipTests>true</skipTests>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.1.2</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <phase>install</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7.2</version>
- <configuration>
- <skipTests>${skipTests}</skipTests>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <!--<scm>-->
- <!--<connection>${scmLocation}/tto/ss/common/trunk/common-query</connection>-->
- <!--</scm>-->
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.14</version>
- </dependency>
- <dependency>
- <groupId>cloudbase</groupId>
- <artifactId>cloudbase-core</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.0.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>cloudbase</groupId>
- <artifactId>cloudbase-start</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>thrift</artifactId>
- </dependency>
- <dependency>
- <groupId>com.vividsolutions</groupId>
- <artifactId>jts</artifactId>
- <version>1.11</version>
- </dependency>
- <dependency>
- <groupId>xerces</groupId>
- <artifactId>xercesImpl</artifactId>
- <version>2.8.1</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java
deleted file mode 100644
index e0126fa..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.io.Text;
-
-import ss.cloudbase.core.iterators.filter.ogc.OGCFilter;
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-import cloudbase.core.iterators.WrappingIterator;
-
-public class CellLevelFilteringIterator extends WrappingIterator {
- private static final Collection<ByteSequence> EMPTY_SET = Collections.emptySet();
-
- /** The OGC Filter string **/
- public static final String OPTION_FILTER = "filter";
-
- /** The character or characters that defines the end of the field in the column qualifier. Defaults to '@' **/
- public static final String OPTION_FIELD_END = "fieldEnd";
-
- protected SortedKeyValueIterator<Key, Value> checkSource;
-
- protected Map<String, Boolean> cache = new HashMap<String, Boolean>();
-
- protected OGCFilter filter;
-
- protected String fieldEnd = "@";
-
- public CellLevelFilteringIterator() {}
-
- public CellLevelFilteringIterator(CellLevelFilteringIterator other, IteratorEnvironment env) {
- setSource(other.getSource().deepCopy(env));
- checkSource = other.checkSource.deepCopy(env);
- cache = other.cache;
- fieldEnd = other.fieldEnd;
- }
-
- @Override
- public CellLevelFilteringIterator deepCopy(IteratorEnvironment env) {
- return new CellLevelFilteringIterator(this, env);
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- if (source instanceof GMDenIntersectingIterator) {
- checkSource = ((GMDenIntersectingIterator) source).docSource.deepCopy(env);
- } else if (source instanceof SortedRangeIterator) {
- checkSource = ((SortedRangeIterator) source).docSource.deepCopy(env);
- } else {
- checkSource = source.deepCopy(env);
- }
- filter = new OGCFilter();
- filter.init(options);
-
- if (options.containsKey(OPTION_FIELD_END)) {
- fieldEnd = options.get(OPTION_FIELD_END);
- }
- }
-
- @Override
- public void next() throws IOException {
- getSource().next();
- findTop();
- }
-
- protected String getDocId(Key key) {
- String colq = key.getColumnQualifier().toString();
- int i = colq.indexOf("\u0000");
- if (i == -1) {
- i = colq.length();
- }
- return colq.substring(0, i);
- }
-
- protected Key getRecordStartKey(Key key, String docId) {
- return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000"));
- }
-
- protected Key getRecordEndKey(Key key, String docId) {
- return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000\uFFFD"));
- }
-
- protected String getField(Key key, Value value) {
- String colq = key.getColumnQualifier().toString();
- int i = colq.indexOf("\u0000");
- if (i == -1) {
- return null;
- }
-
- int j = colq.indexOf(fieldEnd, i + 1);
- if (j == -1) {
- j = colq.length();
- }
-
- return colq.substring(i + 1, j);
- }
-
- protected String getValue(Key key, Value value) {
- return value.toString();
- }
-
- protected void findTop() throws IOException {
- boolean goodKey;
- String docId;
- Map<String, String> record = new HashMap<String, String>();
-
- while (getSource().hasTop()) {
- docId = getDocId(getSource().getTopKey());
-
- // if the document is in the cache, then we have already scanned it
- if (cache.containsKey(docId)) {
- goodKey = cache.get(docId);
- } else {
- // we need to scan the whole record into a map and evaluate the filter
-
- // seek the check source to the beginning of the record
- Range range = new Range(
- getRecordStartKey(getSource().getTopKey(), docId),
- true,
- getRecordEndKey(getSource().getTopKey(), docId),
- true
- );
-
- checkSource.seek(range, EMPTY_SET, false);
-
- // read in the record to the map
- record.clear();
- while (checkSource.hasTop()) {
- String field = getField(checkSource.getTopKey(), checkSource.getTopValue());
- if (field != null) {
- record.put(field, getValue(checkSource.getTopKey(), checkSource.getTopValue()));
- }
- checkSource.next();
- }
-
- // evaluate the filter
- goodKey = filter.accept(record);
-
- // cache the result so that we don't do this for every cell
- cache.put(docId, goodKey);
- }
-
- if (goodKey==true)
- return;
- getSource().next();
- }
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- getSource().seek(range, columnFamilies, inclusive);
- findTop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java
deleted file mode 100644
index 1f59882..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.io.Text;
-
-import ss.cloudbase.core.iterators.filter.CBConverter;
-
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SkippingIterator;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-
-public class CellLevelRecordIterator extends SkippingIterator {
- public static final String OPTION_FIELD_END = "fieldEnd";
- public static final String OPTION_MULTIPLE_DELIMITER = "multipleDelimiter";
-
- protected String multipleDelimiter = ",";
-
- protected Key topKey;
- protected Value topValue;
- protected String fieldEnd = "@";
- protected String docId = null;
- protected CBConverter converter = new CBConverter();
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- CellLevelRecordIterator itr = new CellLevelRecordIterator();
- itr.setSource(this.getSource().deepCopy(env));
- itr.fieldEnd = this.fieldEnd;
- return itr;
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- converter.init(options);
- if (options.containsKey(OPTION_FIELD_END)) {
- fieldEnd = options.get(OPTION_FIELD_END);
- }
-
- if (options.containsKey(OPTION_MULTIPLE_DELIMITER)) {
- multipleDelimiter = options.get(OPTION_MULTIPLE_DELIMITER);
- }
- }
-
- @Override
- public void next() throws IOException {
- consume();
- }
-
- @Override
- public boolean hasTop() {
- return getSource().hasTop() || topKey != null || topValue != null;
- }
-
- @Override
- public Key getTopKey() {
- return topKey;
- }
-
- @Override
- public Value getTopValue() {
- return topValue;
- }
-
- protected String getDocId(Key key) {
- String colq = key.getColumnQualifier().toString();
- int i = colq.indexOf("\u0000");
- if (i == -1) {
- i = colq.length();
- }
- return colq.substring(0, i);
- }
-
- protected Key buildTopKey(Key key, String docId) {
- return new Key(key.getRow(), key.getColumnFamily(), new Text(docId), key.getColumnVisibility(), key.getTimestamp());
- }
-
- protected String getField(Key key, Value value) {
- String colq = key.getColumnQualifier().toString();
- int i = colq.indexOf("\u0000");
- if (i == -1) {
- return null;
- }
-
- int j = colq.indexOf(fieldEnd, i + 1);
- if (j == -1) {
- j = colq.length();
- }
-
- return colq.substring(i + 1, j);
- }
-
- protected String getValue(Key key, Value value) {
- return value.toString();
- }
-
- protected Key getRecordStartKey(Key key, String docId) {
- return new Key(key.getRow(), key.getColumnFamily(), new Text(docId));
- }
-
- protected Key getRecordEndKey(Key key, String docId) {
- return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000\uFFFD"));
- }
-
- @Override
- protected void consume() throws IOException {
- // build the top key
- if (getSource().hasTop()) {
- docId = getDocId(getSource().getTopKey());
- topKey = buildTopKey(getSource().getTopKey(), docId);
-
- Range range = new Range(
- getRecordStartKey(getSource().getTopKey(), docId),
- true,
- getRecordEndKey(getSource().getTopKey(), docId),
- true
- );
-
- Map<String, String> record = new HashMap<String, String>();
- while (getSource().hasTop() && range.contains(getSource().getTopKey())) {
- String field = getField(getSource().getTopKey(), getSource().getTopValue());
- if (field != null) {
- if (record.get(field) == null) {
- record.put(field, getValue(getSource().getTopKey(), getSource().getTopValue()));
- } else {
- record.put(field, record.get(field) + multipleDelimiter + getValue(getSource().getTopKey(), getSource().getTopValue()));
- }
- }
- getSource().next();
- }
-
- topValue = converter.toValue(record);
- } else {
- topKey = null;
- topValue = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java
deleted file mode 100644
index 5e75334..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import ss.cloudbase.core.iterators.conversion.Operation;
-import ss.cloudbase.core.iterators.filter.CBConverter;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-import cloudbase.core.iterators.WrappingIterator;
-
-public class ConversionIterator extends WrappingIterator {
- public static final String OPTION_CONVERSIONS = "conversions";
- public static final String OPTION_MULTI_DOC = "multiDoc";
- /** The character or characters that defines the end of the field in the column qualifier. Defaults to '@' **/
- public static final String OPTION_FIELD_END = "fieldEnd";
-
- protected CBConverter serializedConverter;
- protected Map<String, Operation> conversions;
- protected boolean multiDoc = false;
- protected String fieldEnd = "@";
-
- public ConversionIterator() {}
-
- public ConversionIterator(ConversionIterator other) {
- this.conversions.putAll(other.conversions);
- this.multiDoc = other.multiDoc;
- this.serializedConverter = other.serializedConverter;
- }
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new ConversionIterator(this);
- }
-
- @Override
- public Value getTopValue() {
- if (hasTop()) {
- if (conversions != null) {
- if (multiDoc) {
- return multiDocConvert(super.getTopValue());
- } else {
- return convert(super.getTopValue());
- }
- }
- }
- return super.getTopValue();
- }
-
- protected String getMultiDocField(Key key) {
- String colq = key.getColumnQualifier().toString();
- int start = colq.indexOf("\u0000");
- if (start == -1) {
- return null;
- }
-
- int end = colq.indexOf(fieldEnd, start + 1);
- if (end == -1) {
- end = colq.length();
- }
-
- return colq.substring(start + 1, end);
- }
-
- protected Value multiDocConvert(Value value) {
- String field = getMultiDocField(getTopKey());
- if (conversions.containsKey(field)) {
- String newValue = conversions.get(field).execute(value.toString());
- return new Value(newValue.getBytes());
- } else {
- return value;
- }
- }
-
- protected Value convert(Value value) {
- Map<String, String> record = serializedConverter.toMap(getTopKey(), value);
-
- for (String field: record.keySet()) {
- if (conversions.containsKey(field)) {
- record.put(field, conversions.get(field).execute(record.get(field)));
- }
- }
-
- return serializedConverter.toValue(record);
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
-
- if (options.containsKey(OPTION_MULTI_DOC)) {
- multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC));
- } else {
- multiDoc = false;
- }
-
- if (!multiDoc) {
- serializedConverter = new CBConverter();
- serializedConverter.init(options);
- }
-
- if (options.containsKey(OPTION_FIELD_END)) {
- fieldEnd = options.get(OPTION_FIELD_END);
- }
-
- if (options.containsKey(OPTION_CONVERSIONS)) {
- Operation[] ops = decodeConversions(options.get(OPTION_CONVERSIONS));
- conversions = new HashMap<String, Operation> ();
-
- for (Operation o: ops) {
- conversions.put(o.getField(), o);
- }
- }
- }
-
- /**
- * Encodes a set of conversion strings for use with the OPTION_CONVERSIONS options. Each conversion
- * string should be in the format 'field op value' (whitespace necessary), where op is +, -, *, /, %, or
- * ^ and the value is a number.
- *
- * @param conversions
- * @return The encoded value to use with OPTION_CONVERSIONS
- */
- public static String encodeConversions(String[] conversions) {
- StringBuilder sb = new StringBuilder();
- boolean first = true;
- for (String conversion: conversions) {
- if (first) {
- first = false;
- } else {
- sb.append("\u0000");
- }
- sb.append(conversion);
- }
- return sb.toString();
- }
-
- public static Operation[] decodeConversions(String conversions) {
- String[] configs = conversions.split("\u0000");
- Operation[] ops = new Operation[configs.length];
-
- for (int i = 0; i < configs.length; i++) {
- ops[i] = new Operation(configs[i]);
- }
-
- return ops;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java
deleted file mode 100644
index 7ec401f..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java
+++ /dev/null
@@ -1,363 +0,0 @@
-// Dear Cloudbase,
-// Use protected fields/methods as much as possible in APIs.
-// Love,
-// Will
-
-// since the IntersectingIterator/FamilyIntersectingIterator classes are stingy with their fields, we have to use
-// the exact same package name to get at currentPartition and currentDocID
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import ss.cloudbase.core.iterators.IntersectingIterator.TermSource;
-
-import cloudbase.core.data.ArrayByteSequence;
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-
-/**
- * This class is a copy of FamilyIntersectingIterator with a few minor changes. It assumes a table structure like the following:
- * <table>
- * <tr><th>Row</th><th>Column Family</th><th>Column Qualifier</th><th>Value</th></tr>
- * <tr><td>Partition1</td><td>event</td><td>UUID</td><td>The record value</td></tr>
- * <tr><td>Partition1</td><td>index</td><td>term\u0000UUID</td><td></td></tr>
- * </table>
- *
- * @author William Wall
- *
- */
-public class GMDenIntersectingIterator extends IntersectingIterator {
- private static final Logger logger = Logger.getLogger(GMDenIntersectingIterator.class);
-
- public static final Text DEFAULT_INDEX_COLF = new Text("i");
- public static final Text DEFAULT_DOC_COLF = new Text("e");
-
- public static final String indexFamilyOptionName = "indexFamily";
- public static final String docFamilyOptionName = "docFamily";
-
- protected static Text indexColf = DEFAULT_INDEX_COLF;
- protected static Text docColf = DEFAULT_DOC_COLF;
- protected static Set<ByteSequence> indexColfSet;
- protected static Set<ByteSequence> docColfSet;
-
- protected static final byte[] nullByte = {0};
-
- protected SortedKeyValueIterator<Key,Value> docSource;
-
- /**
- * Use this option to retrieve all the documents that match the UUID rather than just the first. This
- * is commonly used in cell-level security models that use the column-qualifier like this:
- * UUID \0 field1 [] value
- * UUID \0 securedField [ALPHA] secretValue
- **/
- public static final String OPTION_MULTI_DOC = "multiDoc";
-
- /**
- * Use this option to turn off document lookup.
- */
- public static final String OPTION_DOC_LOOKUP = "docLookup";
-
- protected boolean multiDoc = false;
- protected boolean doDocLookup = true;
- protected Range docRange = null;
- protected boolean nextId = false;
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- if (options.containsKey(indexFamilyOptionName))
- indexColf = new Text(options.get(indexFamilyOptionName));
- if (options.containsKey(docFamilyOptionName))
- docColf = new Text(options.get(docFamilyOptionName));
- docSource = source.deepCopy(env);
- indexColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(indexColf.getBytes(),0,indexColf.getLength()));
-
- if (options.containsKey(OPTION_MULTI_DOC)) {
- multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC));
- }
-
- if (options.containsKey(OPTION_DOC_LOOKUP)) {
- doDocLookup = Boolean.parseBoolean(options.get(OPTION_DOC_LOOKUP));
- }
-
- if (!doDocLookup) {
- // it makes no sense to turn on multiDoc if doDocLookup is off
- multiDoc = false;
- }
-
- // remove any range terms
- Text[] originalTerms = decodeColumns(options.get(columnFamiliesOptionName));
- boolean[] originalBooleans = decodeBooleans(options.get(notFlagOptionName));
-
- List<Text> terms = new ArrayList<Text>();
- List<Boolean> termBooleans = new ArrayList<Boolean>();
- List<Text> ranges = new ArrayList<Text>();
- List<Boolean> rangeBooleans = new ArrayList<Boolean>();
-
- boolean boolsExist = originalBooleans != null && originalBooleans.length == originalTerms.length;
-
- for (int i = 0; i < originalTerms.length; i++) {
- if (isRangeTerm(originalTerms[i])) {
- ranges.add(originalTerms[i]);
- if (boolsExist) {
- rangeBooleans.add(originalBooleans[i]);
- } else {
- rangeBooleans.add(false);
- }
- } else {
- terms.add(originalTerms[i]);
-
- if (boolsExist) {
- termBooleans.add(originalBooleans[i]);
- } else {
- termBooleans.add(false);
- }
- }
- }
-
- boolean[] bools = new boolean[termBooleans.size()];
- for (int i = 0; i < termBooleans.size(); i++) {
- bools[i] = termBooleans.get(i).booleanValue();
- }
-
- boolean[] rangeBools = new boolean[rangeBooleans.size()];
- for (int i = 0; i < rangeBooleans.size(); i++) {
- rangeBools[i] = rangeBooleans.get(i).booleanValue();
- }
-
- // put the modified term/boolean lists back in the options
-
- if (terms.size() < 2) {
- // the intersecting iterator will choke on these, so we'll set it up ourselves
- if (terms.size() == 1) {
- sources = new TermSource[1];
- sources[0] = new TermSource(source, terms.get(0));
- }
- } else {
- options.put(columnFamiliesOptionName, encodeColumns(terms.toArray(new Text[terms.size()])));
- if (termBooleans.size() > 0) {
- options.put(notFlagOptionName, encodeBooleans(bools));
- }
-
- super.init(source, options, env);
- }
-
- // add the range terms
- if (ranges.size() > 0) {
-
- TermSource[] localSources;
-
- int offset = 0;
- if (sources != null) {
- localSources = new TermSource[sources.length + ranges.size()];
-
- // copy array
- for (int i = 0; i < sources.length; i++) {
- localSources[i] = sources[i];
- }
-
- offset = sources.length;
- } else {
- localSources = new TermSource[ranges.size()];
- }
-
- for (int i = 0; i < ranges.size(); i++) {
- IntersectionRange ri = new IntersectionRange();
- ri.init(source.deepCopy(env), getRangeIteratorOptions(ranges.get(i)), env);
- localSources[i + offset] = new TermSource(ri, ri.getOutputTerm(), rangeBools[i]);
- }
-
- sources = localSources;
- }
-
- sourcesCount = sources.length;
-
- if (sourcesCount < 2) {
- throw new IOException("GMDenIntersectingIterator requires two or more terms");
- }
-
- docColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(docColf.getBytes(),0,docColf.getLength()));
- }
-
- @Override
- protected Key buildKey(Text partition, Text term, Text docID) {
- Text colq = new Text(term);
- colq.append(nullByte, 0, 1);
- colq.append(docID.getBytes(), 0, docID.getLength());
- return new Key(partition, indexColf, colq);
- }
-
- @Override
- protected Key buildKey(Text partition, Text term) {
- Text colq = new Text(term);
- return new Key(partition, indexColf, colq);
- }
-
- @Override
- protected Text getTerm(Key key) {
- if (indexColf.compareTo(key.getColumnFamily().getBytes(),0,indexColf.getLength())< 0) {
- // We're past the index column family, so return a term that will sort lexicographically last.
- // The last unicode character should suffice
- return new Text("\uFFFD");
- }
- Text colq = key.getColumnQualifier();
- int zeroIndex = colq.find("\0");
- Text term = new Text();
- term.set(colq.getBytes(),0,zeroIndex);
- return term;
- }
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- GMDenIntersectingIterator newItr = new GMDenIntersectingIterator();
- if(sources != null) {
- newItr.sourcesCount = sourcesCount;
- newItr.sources = new TermSource[sourcesCount];
- for(int i = 0; i < sourcesCount; i++) {
- newItr.sources[i] = new TermSource(sources[i].iter.deepCopy(env), sources[i].term);
- }
- }
- newItr.currentDocID = currentDocID;
- newItr.currentPartition = currentPartition;
- newItr.docRange = docRange;
- newItr.docSource = docSource.deepCopy(env);
- newItr.inclusive = inclusive;
- newItr.multiDoc = multiDoc;
- newItr.nextId = nextId;
- newItr.overallRange = overallRange;
- return newItr;
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
- super.seek(range, indexColfSet, true);
-
- }
-
- @Override
- protected Text getDocID(Key key) {
- Text colq = key.getColumnQualifier();
- int firstZeroIndex = colq.find("\0");
- if (firstZeroIndex < 0) {
- throw new IllegalArgumentException("bad docid: "+key.toString());
- }
- Text docID = new Text();
- try {
- docID.set(colq.getBytes(),firstZeroIndex+1, colq.getBytes().length - firstZeroIndex - 1);
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IllegalArgumentException("bad indices for docid: "+key.toString()+" "+firstZeroIndex +" " + (colq.getBytes().length - firstZeroIndex - 1));
- }
- return docID;
- }
-
- protected Key buildStartKey() {
- return new Key(currentPartition, docColf, currentDocID);
- }
-
- protected Key buildEndKey() {
- if (multiDoc) {
- return new Key(currentPartition, docColf, new Text(currentDocID.toString() + "\u0000\uFFFD"));
- }
- return null;
- }
-
- @Override
- public void next() throws IOException {
- if (multiDoc && nextId) {
- docSource.next();
-
- // check to make sure that the docSource top is less than our max key
- if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) {
- topKey = docSource.getTopKey();
- value = docSource.getTopValue();
- return;
- }
- }
-
- nextId = false;
- super.next();
- }
-
- @Override
- protected void advanceToIntersection() throws IOException {
- super.advanceToIntersection();
-
- if (topKey==null || !doDocLookup)
- return;
-
- if (logger.isTraceEnabled()) logger.trace("using top key to seek for doc: "+topKey.toString());
- docRange = new Range(buildStartKey(), true, buildEndKey(), false);
- docSource.seek(docRange, docColfSet, true);
- logger.debug("got doc key: "+docSource.getTopKey().toString());
- if (docSource.hasTop()&& docRange.contains(docSource.getTopKey())) {
- value = docSource.getTopValue();
- }
- logger.debug("got doc value: "+value.toString());
-
- if (docSource.hasTop()) {
- if (multiDoc && topKey != null) {
- nextId = true;
- }
- topKey = docSource.getTopKey();
- }
- }
-
-
- public boolean isRangeTerm(Text term) {
- return term.toString().startsWith("range\u0000");
- }
-
- protected Map<String, String> getRangeIteratorOptions(Text config) {
- // we want the keys from Range Iterators to look like this:
- // range|colf|lower|includeLower|upper|includeUpper
- // e.g. range|geo|21332|true|21333|false
-
- // and we'll output a key like this:
- // partition index:geo\0UUID ...
-
-
- String[] range = config.toString().split("\u0000");
- Map<String, String> options = new HashMap<String, String>();
- options.put(IntersectionRange.OPTION_COLF, range[1]);
- options.put(IntersectionRange.OPTION_OUTPUT_TERM, range[1]);
- options.put(IntersectionRange.OPTION_LOWER_BOUND, range[2]);
- options.put(IntersectionRange.OPTION_START_INCLUSIVE, range[3]);
- options.put(IntersectionRange.OPTION_UPPER_BOUND, range[4]);
- options.put(IntersectionRange.OPTION_END_INCLUSIVE, range[5]);
- options.put(IntersectionRange.OPTION_OUTPUT_COLF, indexColf.toString());
- return options;
- }
-
- /**
- * Builds a range term for use with the IntersectingIterator
- * @param colf The column family to search
- * @param start The start of the range
- * @param includeStart Whether the start of the range is inclusive or not
- * @param end The end of the range
- * @param includeEnd Whether the end of the range is inclusive or not
- * @return A String formatted for use as a term a GMDenIntersectingIterator
- */
- public static String getRangeTerm(String colf, String start, boolean includeStart, String end, boolean includeEnd) {
- StringBuilder sb = new StringBuilder();
- sb.append("range\u0000");
- sb.append(colf).append("\u0000");
- sb.append(start).append("\u0000");
- sb.append(includeStart ? "true": "false").append("\u0000");
- sb.append(end).append("\u0000");
- sb.append(includeEnd ? "true": "false").append("\u0000");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java
deleted file mode 100644
index 3b4961f..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java
+++ /dev/null
@@ -1,557 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.PartialKey;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-import cloudbase.core.util.TextUtil;
-
-public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
-
- protected Text nullText = new Text();
-
- protected Text getPartition(Key key) {
- return key.getRow();
- }
-
- protected Text getTerm(Key key) {
- return key.getColumnFamily();
- }
-
- protected Text getDocID(Key key) {
- return key.getColumnQualifier();
- }
-
- protected Key buildKey(Text partition, Text term) {
- return new Key(partition,(term == null) ? nullText : term);
- }
-
- protected Key buildKey(Text partition, Text term, Text docID) {
- return new Key(partition,(term == null) ? nullText : term, docID);
- }
-
- protected Key buildFollowingPartitionKey(Key key) {
- return key.followingKey(PartialKey.ROW);
- }
-
- protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
-
- protected static class TermSource {
- public SortedKeyValueIterator<Key,Value> iter;
- public Text term;
- public boolean notFlag;
-
- public TermSource(TermSource other) {
- this.iter = other.iter;
- this.term = other.term;
- this.notFlag = other.notFlag;
- }
-
- public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
- this.iter = iter;
- this.term = term;
- this.notFlag = false;
- }
- public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
- this.iter = iter;
- this.term = term;
- this.notFlag = notFlag;
- }
-
- public String getTermString() {
- return (this.term == null) ? new String("Iterator") : this.term.toString();
- }
- }
-
- protected TermSource[] sources;
- protected int sourcesCount = 0;
-
- protected Range overallRange;
-
- // query-time settings
- protected Text currentPartition = null;
- protected Text currentDocID = new Text(emptyByteArray);
- protected static final byte [] emptyByteArray = new byte[0];
-
- protected Key topKey = null;
- protected Value value = new Value(emptyByteArray);
-
- protected Collection<ByteSequence> seekColumnFamilies;
-
- protected boolean inclusive;
-
-
- public IntersectingIterator()
- {}
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new IntersectingIterator(this, env);
- }
-
- public IntersectingIterator(IntersectingIterator other, IteratorEnvironment env)
- {
- if(other.sources != null)
- {
- sourcesCount = other.sourcesCount;
- sources = new TermSource[sourcesCount];
- for(int i = 0; i < sourcesCount; i++)
- {
- sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
- }
- }
- }
-
- @Override
- public Key getTopKey() {
- return topKey;
- }
-
- @Override
- public Value getTopValue() {
- // we don't really care about values
- return value;
- }
-
- @Override
- public boolean hasTop() {
- return currentPartition != null;
- }
-
- // precondition: currentRow is not null
- private boolean seekOneSource(int sourceID) throws IOException
- {
- // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ)
- // advance the cursor if this source goes beyond it
- // return whether we advanced the cursor
-
- // within this loop progress must be made in one of the following forms:
- // - currentRow or currentCQ must be increased
- // - the given source must advance its iterator
- // this loop will end when any of the following criteria are met
- // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ)
- // - the given source is out of data and currentRow is set to null
- // - the given source has advanced beyond the endRow and currentRow is set to null
- boolean advancedCursor = false;
-
- if (sources[sourceID].notFlag)
- {
- while(true)
- {
- if(sources[sourceID].iter.hasTop() == false)
- {
- // an empty column that you are negating is a valid condition
- break;
- }
- // check if we're past the end key
- int endCompare = -1;
- // we should compare the row to the end of the range
- if(overallRange.getEndKey() != null)
- {
- endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
- if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0)
- {
- // an empty column that you are negating is a valid condition
- break;
- }
- }
- int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
- // check if this source is already at or beyond currentRow
- // if not, then seek to at least the current row
-
- if(partitionCompare > 0)
- {
- // seek to at least the currentRow
- Key seekKey = buildKey(currentPartition,sources[sourceID].term);
- sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive);
- continue;
- }
- // check if this source has gone beyond currentRow
- // if so, this is a valid condition for negation
- if(partitionCompare < 0)
- {
- break;
- }
- // we have verified that the current source is positioned in currentRow
- // now we must make sure we're in the right columnFamily in the current row
- // Note: Iterators are auto-magically set to the correct columnFamily
- if(sources[sourceID].term != null)
- {
- int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
- // check if this source is already on the right columnFamily
- // if not, then seek forwards to the right columnFamily
- if(termCompare > 0)
- {
- Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive);
- continue;
- }
- // check if this source is beyond the right columnFamily
- // if so, then this is a valid condition for negating
- if(termCompare < 0)
- {
- break;
- }
- }
-
- // we have verified that we are in currentRow and the correct column family
- // make sure we are at or beyond columnQualifier
- Text docID = getDocID(sources[sourceID].iter.getTopKey());
- int docIDCompare = currentDocID.compareTo(docID);
- // If we are past the target, this is a valid result
- if(docIDCompare < 0)
- {
- break;
- }
- // if this source is not yet at the currentCQ then advance in this source
- if(docIDCompare > 0)
- {
- // seek forwards
- Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- continue;
- }
- // if we are equal to the target, this is an invalid result.
- // Force the entire process to go to the next row.
- // We are advancing column 0 because we forced that column to not contain a !
- // when we did the init()
- if(docIDCompare == 0)
- {
- sources[0].iter.next();
- advancedCursor = true;
- break;
- }
- }
- }
- else
- {
- while(true)
- {
- if(sources[sourceID].iter.hasTop() == false)
- {
- currentPartition = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- // check if we're past the end key
- int endCompare = -1;
- // we should compare the row to the end of the range
-
- if(overallRange.getEndKey() != null)
- {
- endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
- if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0)
- {
- currentPartition = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- }
- int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
- // check if this source is already at or beyond currentRow
- // if not, then seek to at least the current row
- if(partitionCompare > 0)
- {
- // seek to at least the currentRow
- Key seekKey = buildKey(currentPartition,sources[sourceID].term);
- sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive);
- continue;
- }
- // check if this source has gone beyond currentRow
- // if so, advance currentRow
- if(partitionCompare < 0)
- {
- currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
- currentDocID.set(emptyByteArray);
- advancedCursor = true;
- continue;
- }
- // we have verified that the current source is positioned in currentRow
- // now we must make sure we're in the right columnFamily in the current row
- // Note: Iterators are auto-magically set to the correct columnFamily
-
- if(sources[sourceID].term != null)
- {
- int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
- // check if this source is already on the right columnFamily
- // if not, then seek forwards to the right columnFamily
- if(termCompare > 0)
- {
- Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive);
- continue;
- }
- // check if this source is beyond the right columnFamily
- // if so, then seek to the next row
- if(termCompare < 0)
- {
- // we're out of entries in the current row, so seek to the next one
- // byte[] currentRowBytes = currentRow.getBytes();
- // byte[] nextRow = new byte[currentRowBytes.length + 1];
- // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length);
- // nextRow[currentRowBytes.length] = (byte)0;
- // // we should reuse text objects here
- // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID]));
- if(endCompare == 0)
- {
- // we're done
- currentPartition = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
- try {
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- } catch (Exception e) {
- // the seek will throw an exception if we have crossed a tablet boundary
- // setting the Partition to null will advance to the next tablet
- currentPartition = null;
- return true;
- }
- continue;
- }
- }
- // we have verified that we are in currentRow and the correct column family
- // make sure we are at or beyond columnQualifier
- Text docID = getDocID(sources[sourceID].iter.getTopKey());
- int docIDCompare = currentDocID.compareTo(docID);
- // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
- if(docIDCompare < 0)
- {
- currentDocID.set(docID);
- advancedCursor = true;
- break;
- }
- // if this source is not yet at the currentCQ then seek in this source
- if(docIDCompare > 0)
- {
- // seek forwards
- Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
- sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive);
- continue;
- }
- // this source is at the current row, in its column family, and at currentCQ
- break;
- }
- }
- return advancedCursor;
- }
-
- @Override
- public void next() throws IOException {
- if(currentPartition == null)
- {
- return;
- }
- // precondition: the current row is set up and the sources all have the same column qualifier
- // while we don't have a match, seek in the source with the smallest column qualifier
- sources[0].iter.next();
- advanceToIntersection();
- }
-
- protected void advanceToIntersection() throws IOException
- {
- boolean cursorChanged = true;
- int numSeeks = 0;
- while(cursorChanged)
- {
- // seek all of the sources to at least the highest seen column qualifier in the current row
- cursorChanged = false;
- for(int i = 0; i < sourcesCount; i++)
- {
- if(currentPartition == null)
- {
- topKey = null;
- return;
- }
- numSeeks++;
- if(seekOneSource(i))
- {
- cursorChanged = true;
- break;
- }
- }
- }
- topKey = buildKey(currentPartition,nullText,currentDocID);
- }
-
- public static String stringTopKey(SortedKeyValueIterator<Key, Value> iter) {
- if (iter.hasTop())
- return iter.getTopKey().toString();
- return "";
- }
-
- public static final String columnFamiliesOptionName = "columnFamilies";
- public static final String notFlagOptionName = "notFlag";
-
- public static String encodeColumns(Text[] columns)
- {
- StringBuilder sb = new StringBuilder();
- for(int i = 0; i < columns.length; i++)
- {
- sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
- sb.append('\n');
- }
- return sb.toString();
- }
-
- public static String encodeBooleans(boolean[] flags)
- {
- byte[] bytes = new byte[flags.length];
- for(int i = 0; i < flags.length; i++)
- {
- if(flags[i])
- bytes[i] = 1;
- else
- bytes[i] = 0;
- }
- return new String(Base64.encodeBase64(bytes));
- }
-
- public static Text[] decodeColumns(String columns)
- {
- String[] columnStrings = columns.split("\n");
- Text[] columnTexts = new Text[columnStrings.length];
- for(int i = 0; i < columnStrings.length; i++)
- {
- columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
- }
- return columnTexts;
- }
-
- public static boolean[] decodeBooleans(String flags)
- {
- // return null of there were no flags
- if(flags == null)
- return null;
-
- byte[] bytes = Base64.decodeBase64(flags.getBytes());
- boolean[] bFlags = new boolean[bytes.length];
- for(int i = 0; i < bytes.length; i++)
- {
- if(bytes[i] == 1)
- bFlags[i] = true;
- else
- bFlags[i] = false;
- }
- return bFlags;
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source,
- Map<String, String> options, IteratorEnvironment env) throws IOException {
- Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
- boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
-
- if(terms.length < 2)
- {
- throw new IOException("IntersectionIterator requires two or more columns families");
- }
-
- // Scan the not flags.
- // There must be at least one term that isn't negated
- // And we are going to re-order such that the first term is not a ! term
- if(notFlag == null)
- {
- notFlag = new boolean[terms.length];
- for(int i = 0; i < terms.length; i++)
- notFlag[i] = false;
- }
- if(notFlag[0]) {
- for(int i = 1; i < notFlag.length; i++)
- {
- if(notFlag[i] == false)
- {
- Text swapFamily = new Text(terms[0]);
- terms[0].set(terms[i]);
- terms[i].set(swapFamily);
- notFlag[0] = false;
- notFlag[i] = true;
- break;
- }
- }
- if(notFlag[0])
- {
- throw new IOException("IntersectionIterator requires at lest one column family without not");
- }
- }
-
-
- sources = new TermSource[terms.length];
- sources[0] = new TermSource(source, terms[0]);
- for(int i = 1; i < terms.length; i++)
- {
- sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
- }
- sourcesCount = terms.length;
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
- overallRange = new Range(range);
- currentPartition = new Text();
- currentDocID.set(emptyByteArray);
-
- this.seekColumnFamilies = seekColumnFamilies;
- this.inclusive = inclusive;
-
- // seek each of the sources to the right column family within the row given by key
- for(int i = 0; i < sourcesCount; i++)
- {
- Key sourceKey;
- if(range.getStartKey() != null)
- {
- if(range.getStartKey().getColumnQualifier() != null)
- {
- sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term,range.getStartKey().getColumnQualifier());
- }
- else
- {
- sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term);
- }
- sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive);
- }
- else
- {
- sources[i].iter.seek(range, seekColumnFamilies, inclusive);
- }
- }
- advanceToIntersection();
- }
-
- public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env,
- Text term, boolean notFlag) {
- // Check if we have space for the added Source
- if(sources == null)
- {
- sources = new TermSource[1];
- }
- else
- {
- // allocate space for node, and copy current tree.
- // TODO: Should we change this to an ArrayList so that we can just add() ?
- TermSource[] localSources = new TermSource[sources.length + 1];
- int currSource = 0;
- for(TermSource myTerm : sources)
- {
- // TODO: Do I need to call new here? or can I just re-use the term?
- localSources[currSource] = new TermSource(myTerm);
- currSource++;
- }
- sources = localSources;
- }
- sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
- sourcesCount++;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java
deleted file mode 100644
index 04d5884..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java
+++ /dev/null
@@ -1,330 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import cloudbase.core.client.CBException;
-import cloudbase.core.data.ArrayByteSequence;
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.PartialKey;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-
-/**
- * When attempting to intersect a term which is a range (lowerval <= x <= upperval), the entire range
- * must first be scanned so that the document keys can be sorted before passing them up to the
- * intersecting iterator of choice.
- *
- * @author William Wall (wawall)
- */
-public class IntersectionRange implements SortedKeyValueIterator<Key, Value>{
- private static final Logger logger = Logger.getLogger(IntersectionRange.class);
-
- public static final String OPTION_OUTPUT_COLF = "outputColf";
- public static final String OPTION_OUTPUT_TERM = "outputTerm";
- public static final String OPTION_COLF = "columnFamily";
- public static final String OPTION_LOWER_BOUND = "lower";
- public static final String OPTION_UPPER_BOUND = "upper";
- public static final String OPTION_DELIMITER = "delimiter";
- public static final String OPTION_START_INCLUSIVE = "startInclusive";
- public static final String OPTION_END_INCLUSIVE = "endInclusive";
- public static final String OPTION_TEST_OUTOFMEM = "testOutOfMemory";
-
- protected SortedKeyValueIterator<Key, Value> source;
- protected Text colf = null;
- protected Text lower = null;
- protected Text upper = null;
- protected String delimiter = null;
- protected String outputTerm = null;
- protected Text outputColf = null;
- protected Text currentPartition = null;
- protected boolean startInclusive = true;
- protected boolean endInclusive = false;
- protected boolean testOutOfMemory = false;
-
- protected Key topKey = null;
-
- protected Iterator<Key> itr;
- protected boolean sortComplete = false;
- protected Range overallRange;
- protected SortedSet<Key> docIds = new TreeSet<Key>();
- protected static Set<ByteSequence> indexColfSet;
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- return new IntersectionRange(this, env);
- }
-
- public IntersectionRange() {
- logger.setLevel(Level.ALL);
- }
-
- public IntersectionRange(IntersectionRange other, IteratorEnvironment env) {
- source = other.source.deepCopy(env);
- colf = other.colf;
- lower = other.lower;
- upper = other.upper;
- delimiter = other.delimiter;
- outputColf = other.outputColf;
- outputTerm = other.outputTerm;
- currentPartition = other.currentPartition;
- startInclusive = other.startInclusive;
- endInclusive = other.endInclusive;
- topKey = other.topKey;
- docIds.addAll(other.docIds);
- itr = docIds.iterator();
- sortComplete = other.sortComplete;
- overallRange = other.overallRange;
- }
-
- public Text getOutputTerm() {
- return new Text(outputTerm);
- }
-
- public Text getOutputColumnFamily() {
- return outputColf;
- }
-
- @Override
- public Key getTopKey() {
- return topKey;
- }
-
- @Override
- public Value getTopValue() {
- return IteratorConstants.emptyValue;
- }
-
- @Override
- public boolean hasTop() {
- try {
- if (topKey == null) next();
- } catch (IOException e) {
-
- }
-
- return topKey != null;
- }
-
- protected String getDocID(Key key) {
- try {
- String s = key.getColumnQualifier().toString();
- int start = s.indexOf("\u0000") + 1;
- int end = s.indexOf("\u0000", start);
- if (end == -1) {
- end = s.length();
- }
- return s.substring(start, end);
- } catch (Exception e) {
-
- }
- return null;
- }
-
- protected Text getTerm(Key key) {
- try {
- Text colq = key.getColumnQualifier();
- Text term = new Text();
- term.set(colq.getBytes(), 0, colq.find("\0"));
- return term;
- } catch (Exception e) {
- }
- return null;
- }
-
- protected Text getPartition(Key key) {
- return key.getRow();
- }
-
- protected Text getFollowingPartition(Key key) {
- return key.followingKey(PartialKey.ROW).getRow();
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- if (options.containsKey(OPTION_LOWER_BOUND)) {
- lower = new Text(options.get(OPTION_LOWER_BOUND));
- } else {
- lower = new Text("\u0000");
- }
-
- if (options.containsKey(OPTION_UPPER_BOUND)) {
- upper = new Text(options.get(OPTION_UPPER_BOUND));
- } else {
- upper = new Text("\u0000");
- }
-
- if (options.containsKey(OPTION_DELIMITER)) {
- delimiter = options.get(OPTION_DELIMITER);
- } else {
- delimiter = "\u0000";
- }
-
- if (options.containsKey(OPTION_COLF)) {
- colf = new Text(options.get(OPTION_COLF));
- } else {
- colf = new Text("index");
- }
-
- if (options.containsKey(OPTION_OUTPUT_COLF)) {
- outputColf = new Text(options.get(OPTION_OUTPUT_COLF));
- } else {
- outputColf = colf;
- }
-
- if (options.containsKey(OPTION_START_INCLUSIVE)) {
- startInclusive = Boolean.parseBoolean(options.get(OPTION_START_INCLUSIVE));
- }
-
- if (options.containsKey(OPTION_END_INCLUSIVE)) {
- endInclusive = Boolean.parseBoolean(options.get(OPTION_END_INCLUSIVE));
- }
-
- if (options.containsKey(OPTION_TEST_OUTOFMEM)) {
- testOutOfMemory = Boolean.parseBoolean(options.get(OPTION_TEST_OUTOFMEM));
- }
-
- outputTerm = options.get(OPTION_OUTPUT_TERM);
- this.source = source;
-
- indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(),0,colf.getLength()));
- }
-
- /**
- * Sets up the document/record IDs in a sorted structure.
- * @throws IOException
- * @throws CBException
- */
- protected void setUpDocIds() throws IOException {
- int count = 0;
- try {
- if (testOutOfMemory) {
- throw new OutOfMemoryError();
- }
-
- long start = System.currentTimeMillis();
- if (source.hasTop()) {
- docIds.clear();
- currentPartition = getPartition(source.getTopKey());
- while (currentPartition != null) {
- Key lowerKey = new Key(currentPartition, colf, lower);
- try {
- source.seek(new Range(lowerKey, true, null, false), indexColfSet, true);
- } catch (IllegalArgumentException e) {
- // the range does not overlap the overall range? quit
- currentPartition = null;
- break;
- }
-
- // if we don't have a value then quit
- if (!source.hasTop()) {
- currentPartition = null;
- break;
- }
-
- Key top;
- while(source.hasTop()) {
- top = source.getTopKey();
-
- if (overallRange != null && overallRange.getEndKey() != null) {
- // see if we're past the end of the partition range
- int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW);
- if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
- // we're done
- currentPartition = null;
- break;
- }
- }
-
- // make sure we're still in the right partition
- if (currentPartition.compareTo(getPartition(top)) < 0) {
- currentPartition.set(getPartition(top));
- break;
- }
-
- // make sure we're still in the right column family
- if (colf.compareTo(top.getColumnFamily()) < 0) {
- // if not, then get the next partition
- currentPartition = getFollowingPartition(top);
- break;
- }
-
- Text term = getTerm(top);
- int lowerCompare = term.compareTo(lower);
- int upperCompare = term.compareTo(upper);
-
- // if we went past the upper bound, jump to the next partition
- if ((endInclusive && upperCompare > 0) || (!endInclusive && upperCompare >= 0)) {
- currentPartition = getFollowingPartition(top);
- break;
- } else if ((startInclusive && lowerCompare >= 0) || (!startInclusive && lowerCompare > 0)) {
- // if the term is lexicographically between the upper and lower bounds,
- // then add the doc ID
- docIds.add(buildOutputKey(top));
- count++;
- }
- source.next();
-
- // make sure we check to see if we're at the end before potentially seeking back
- if (!source.hasTop()) {
- currentPartition = null;
- break;
- }
- }
- }
- itr = docIds.iterator();
- sortComplete = true;
- logger.debug("setUpDocIds completed for " + lower + "<=" + colf + "<=" + upper + " in " + (System.currentTimeMillis() - start) + " ms. Count = " + count);
- } else {
- logger.warn("There appear to be no records on this tablet");
- }
- } catch (OutOfMemoryError e) {
- logger.warn("OutOfMemory error: Count = " + count);
- throw new IOException("OutOfMemory error while sorting keys");
- }
- }
-
- protected Key buildOutputKey(Key key) {
- String id = getDocID(key);
- return new Key(currentPartition, outputColf, new Text((outputTerm != null ? outputTerm: colf.toString()) + "\u0000" +id));
- }
-
- @Override
- public void next() throws IOException {
- if (itr != null && itr.hasNext()) {
- topKey = itr.next();
- } else {
- topKey = null;
- }
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> colfs, boolean inclusive) throws IOException {
- if (!sortComplete) {
- overallRange = range;
- source.seek(range, colfs, inclusive);
- setUpDocIds();
- }
-
- if (range.getStartKey() != null) {
- while (hasTop() && topKey.compareTo(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL) < 0) {
- next();
- }
- } else {
- next();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java
deleted file mode 100644
index 0db50f6..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import org.apache.hadoop.io.Text;
-
-import cloudbase.core.data.Value;
-
-public class IteratorConstants {
- public static final byte[] emptyByteArray = new byte[0];
- public static final Value emptyValue = new Value(emptyByteArray);
- public static final Text emptyText = new Text(emptyByteArray);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java
deleted file mode 100644
index c25cc72..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java
+++ /dev/null
@@ -1,173 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import cloudbase.core.data.Key;
-import cloudbase.core.data.PartialKey;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-
-/**
- * Iterates over the minimum value of every term with the given prefix and parts delimeter. If, for example, you
- * wanted to find each person's last known position, you would set up the following index:
- *
- * We want the last date instead of the first, so we'll use reverseDate in our index
- * partitionX index:<prefix>_<personID>_<reverseDate>.<recordID>
- *
- * (where "." is actually "\u0000")
- *
- * <code>SortedMinIterator</code> initially seeks to index:prefix in the first partition. From there, it grabs the record
- * as the "document" and then seeks to index:<whatever-the-term-was-up-to-last-delimiter> + "\uFFFD" (last unicode
- * character), which then puts it at the next persion ID in our example.
- *
- * NOTE that this iterator gives a unique result per tablet server. You may have to process the results to determine
- * the true minimum value.
- *
- * @author William Wall (wawall)
- */
-public class SortedMinIterator extends SortedRangeIterator {
- private static final Logger logger = Logger.getLogger(SortedMinIterator.class);
-
- /**
- * The option to supply a prefix to the term combination. Defaults to "min"
- */
- public static final String OPTION_PREFIX = "prefix";
-
- /**
- * The delimiter for the term (note that this is and must be different than the delimiter between the term and record ID). Defaults to "_"
- */
- public static final String OPTION_PARTS_DELIMITER = "partsDelimiter";
-
- protected String prefix = "min";
- protected String partsDelimiter = "_";
- protected boolean firstKey = true;
- protected String lastPart = null;
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
-
- prefix = options.get(OPTION_PREFIX);
- String s = options.get(OPTION_PARTS_DELIMITER);
- partsDelimiter = s != null ? s: "_";
- //TODO: make sure prefix and partsDelimeter is set
- lower = new Text(prefix);
- }
-
- protected String getPrefix(Key key) {
- String s = key.getColumnQualifier().toString();
- int i = s.indexOf(partsDelimiter);
- if (i > 0) {
- return s.substring(0, i + partsDelimiter.length());
- }
- return null;
- }
-
- protected String getPart(Key key) {
- String s = key.getColumnQualifier().toString();
- int i = s.lastIndexOf(partsDelimiter);
- if (i > 0) {
- return s.substring(0, i + 1);
- }
- return null;
- }
-
- @Override
- protected void setUpDocIds() throws IOException {
- int count = 0;
- try {
- if (testOutOfMemory) {
- throw new OutOfMemoryError();
- }
-
- long start = System.currentTimeMillis();
- if (source.hasTop()) {
- SortedSet<Key> docIds = new TreeSet<Key>();
- currentPartition = getPartition(source.getTopKey());
- while (currentPartition != null) {
- // seek to the prefix (aka lower)
- Key lowerKey = new Key(currentPartition, colf, lower);
- source.seek(new Range(lowerKey, true, null, false), indexColfSet, true);
-
- // if we don't have a value then quit
- if (!source.hasTop()) {
- currentPartition = null;
- }
-
- Key top;
- while(source.hasTop()) {
- top = source.getTopKey();
-
- if (overallRange != null && overallRange.getEndKey() != null) {
- // see if we're past the end of the partition range
- int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW);
- if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
- // we're done
- currentPartition = null;
- break;
- }
- }
-
- // make sure we're still in the right partition
- if (currentPartition.compareTo(getPartition(top)) < 0) {
- currentPartition.set(getPartition(top));
- break;
- }
-
- // make sure we're still in the right column family
- if (colf.compareTo(top.getColumnFamily()) < 0) {
- // if not, then get the next partition
- currentPartition = getFollowingPartition(top);
- break;
- }
-
- // make sure we're still in the index prefix
- String p = getPrefix(top);
- String part = getPart(top);
-
- if (p != null && p.startsWith(prefix)) {
- if (part != null) {
- if (!part.equals(lastPart)) {
- // if the part (e.g. "lastPosition_personId_") is different, then it's valid
- lastPart = part;
- docIds.add(buildOutputKey(top));
- count++;
- }
-
- // seek to the next part
- lowerKey = new Key(currentPartition, colf, new Text(part + "\uFFFD"));
- source.seek(new Range(lowerKey, true, null, false), indexColfSet, true);
- }
- } else {
- // we're done in this partition
- currentPartition = getFollowingPartition(top);
- break;
- }
-
- // make sure we check to see if we're at the end before potentially seeking back
- if (!source.hasTop()) {
- currentPartition = null;
- break;
- }
- }
- }
- itr = docIds.iterator();
- sortComplete = true;
- logger.debug("setUpDocIds completed in " + (System.currentTimeMillis() - start) + " ms. Count = " + count);
- } else {
- logger.warn("There appear to be no records on this tablet");
- }
- } catch (OutOfMemoryError e) {
- logger.warn("OutOfMemory error: Count = " + count);
- throw new IOException("OutOfMemory error while sorting keys");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java
deleted file mode 100644
index 4541230..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import cloudbase.core.data.ArrayByteSequence;
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.PartialKey;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-
-/**
- * <code>SortedRangeIterator</code> uses the insertion sort functionality of <code>IntersectionRange</code>
- * to store off document keys rather than term keys.
- *
- * @author William Wall (wawall)
- */
-public class SortedRangeIterator extends IntersectionRange {
- private static final Logger logger = Logger.getLogger(SortedRangeIterator.class);
-
- /** Use this option to set the document column family. Defaults to "event". **/
- public static final String OPTION_DOC_COLF = "docColf";
-
- /**
- * Use this option to retrieve all the documents that match the UUID rather than just the first. This
- * is commonly used in cell-level security models that use the column-qualifier like this:
- * UUID \0 field1 [] value
- * UUID \0 securedField [ALPHA] secretValue
- **/
- public static final String OPTION_MULTI_DOC = "multiDoc";
-
- /** The source document iterator **/
- protected SortedKeyValueIterator<Key, Value> docSource;
-
- /** The document column family. Defaults to "event". **/
- protected Text docColf;
- protected Value docValue;
-
- protected boolean nextId = false;
- protected Range docRange = null;
- protected boolean multiDoc;
-
- protected Set<ByteSequence> docColfSet;
-
- @Override
- public void next() throws IOException {
- if (multiDoc && nextId) {
- docSource.next();
-
- // check to make sure that the docSource top is less than our max key
- if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) {
- topKey = docSource.getTopKey();
- docValue = docSource.getTopValue();
- return;
- }
- }
-
- super.next();
-
- // if we're looking for multiple documents in the doc source, then
- // set the max key for our range check
- if (topKey != null) {
- Text row = topKey.getRow();
- Text colf = topKey.getColumnFamily();
- if (multiDoc) {
- docRange = new Range(
- new Key (row, colf, new Text(topKey.getColumnQualifier().toString())),
- true,
- new Key (row, colf, new Text(topKey.getColumnQualifier().toString() + "\u0000\uFFFD")),
- true
- );
- } else {
- docRange = new Range(new Key (row, colf, new Text(topKey.getColumnQualifier().toString())),true, null, false);
- }
- }
-
- nextId = false;
- getDocument();
- }
-
- @Override
- public Value getTopValue() {
- return docValue;
- }
-
- @Override
- protected Key buildOutputKey(Key key) {
- // we want to build the document key as the output key
- return new Key(currentPartition, docColf, new Text(getDocID(key)));
- }
-
- protected void getDocument() throws IOException {
- // look up the document value
- if (topKey != null) {
- docSource.seek(docRange, docColfSet, true);
-
- if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) {
- // found it!
- topKey = docSource.getTopKey();
- docValue = docSource.getTopValue();
- nextId = true;
- } else {
- // does not exist or user had auths that could see the index but not the event
- logger.warn("Document: " + topKey + " does not exist or user had auths for " + colf + " but not " + docColf);
- docValue = IteratorConstants.emptyValue;
- }
- }
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- docSource = source.deepCopy(env);
- if (options.containsKey(OPTION_DOC_COLF)) {
- docColf = new Text(options.get(OPTION_DOC_COLF));
- } else {
- docColf = new Text("event");
- }
-
- if (options.containsKey(OPTION_MULTI_DOC)) {
- multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC));
- } else {
- multiDoc = false;
- }
-
- docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(docColf.getBytes(), 0, docColf.getLength()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java
----------------------------------------------------------------------
diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java
deleted file mode 100644
index 2111bbd..0000000
--- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package ss.cloudbase.core.iterators;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-import cloudbase.core.data.ByteSequence;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.PartialKey;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SkippingIterator;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-import cloudbase.core.iterators.WrappingIterator;
-
-/**
- * This iterator gets unique keys by the given depth. The depth defaults to PartialKey.ROW_COLFAM.
- *
- * @author William Wall
- */
-public class UniqueIterator extends WrappingIterator {
- public static final String OPTION_DEPTH = "depth";
- private static final Collection<ByteSequence> EMPTY_SET = Collections.emptySet();
- protected PartialKey depth;
- protected Range range;
- protected Key lastKey = null;
-
- public UniqueIterator() {}
-
- public UniqueIterator(UniqueIterator other) {
- this.depth = other.depth;
- this.range = other.range;
- this.lastKey = other.lastKey;
- }
-
- @Override
- public void next() throws IOException {
- consume();
- }
-
- protected void consume() throws IOException {
- if (lastKey != null) {
- int count = 0;
- // next is way faster, so we'll try doing that 10 times before seeking
- while (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0 && count < 10) {
- getSource().next();
- count++;
- }
- if (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0) {
- reseek(getSource().getTopKey().followingKey(depth));
- }
- }
-
- if (getSource().hasTop()) {
- lastKey = getSource().getTopKey();
- }
- }
-
- protected void reseek(Key key) throws IOException {
- if (range.afterEndKey(key)) {
- range = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive());
- } else {
- range = new Range(key, true, range.getEndKey(), range.isEndKeyInclusive());
- }
- getSource().seek(range, EMPTY_SET, false);
- }
-
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
-
- if (options.containsKey(OPTION_DEPTH)) {
- depth = PartialKey.getByDepth(Integer.parseInt(options.get(OPTION_DEPTH)));
- } else {
- depth = PartialKey.ROW_COLFAM;
- }
- }
-
- @Override
- public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
- UniqueIterator u = new UniqueIterator(this);
- u.setSource(getSource().deepCopy(env));
- return u;
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
- this.range = range;
- getSource().seek(range, columnFamilies, inclusive);
- consume();
- }
-}