You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2011/12/05 21:05:51 UTC
svn commit: r1210600 [16/16] - in
/incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/
ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/
ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/
ingest/src/main/java/protobuf/...
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package util;
import java.util.HashMap;
@@ -21,68 +21,57 @@ import java.util.Map;
import org.apache.accumulo.core.data.Key;
-public class BaseKeyParser
-{
- public static final String ROW_FIELD = "row";
- public static final String COLUMN_FAMILY_FIELD = "columnFamily";
- public static final String COLUMN_QUALIFIER_FIELD = "columnQualifier";
-
- protected Map <String, String> keyFields = new HashMap <String, String> ();
- protected Key key = null;
-
- /**
- * Parses a Key object into its constituent fields. This method
- * clears any prior values, so the object can be reused
- * without requiring a new instantiation. This default implementation
- * makes the row, columnFamily, and columnQualifier available.
- *
- * @param key
- */
- public void parse (Key key)
- {
- this.key = key;
-
- keyFields.clear();
-
- keyFields.put (ROW_FIELD, key.getRow().toString());
- keyFields.put (COLUMN_FAMILY_FIELD, key.getColumnFamily().toString());
- keyFields.put (COLUMN_QUALIFIER_FIELD, key.getColumnQualifier().toString());
- }
-
- public String getFieldValue (String fieldName)
- {
- return keyFields.get(fieldName);
- }
-
- public String[] getFieldNames()
- {
- String[] fieldNames = new String[keyFields.size()];
- return keyFields.keySet().toArray(fieldNames);
- }
-
- public BaseKeyParser duplicate ()
- {
- return new BaseKeyParser();
- }
-
- public String getRow()
- {
- return keyFields.get(ROW_FIELD);
- }
-
- public String getColumnFamily ()
- {
- return keyFields.get(COLUMN_FAMILY_FIELD);
- }
-
- public String getColumnQualifier()
- {
- return keyFields.get(COLUMN_QUALIFIER_FIELD);
- }
-
- public Key getKey()
- {
- return this.key;
- }
-
+public class BaseKeyParser {
+ public static final String ROW_FIELD = "row";
+ public static final String COLUMN_FAMILY_FIELD = "columnFamily";
+ public static final String COLUMN_QUALIFIER_FIELD = "columnQualifier";
+
+ protected Map<String,String> keyFields = new HashMap<String,String>();
+ protected Key key = null;
+
+ /**
+ * Parses a Key object into its constituent fields. This method clears any prior values, so the object can be reused without requiring a new instantiation.
+ * This default implementation makes the row, columnFamily, and columnQualifier available.
+ *
+ * @param key
+ */
+ public void parse(Key key) {
+ this.key = key;
+
+ keyFields.clear();
+
+ keyFields.put(ROW_FIELD, key.getRow().toString());
+ keyFields.put(COLUMN_FAMILY_FIELD, key.getColumnFamily().toString());
+ keyFields.put(COLUMN_QUALIFIER_FIELD, key.getColumnQualifier().toString());
+ }
+
+ public String getFieldValue(String fieldName) {
+ return keyFields.get(fieldName);
+ }
+
+ public String[] getFieldNames() {
+ String[] fieldNames = new String[keyFields.size()];
+ return keyFields.keySet().toArray(fieldNames);
+ }
+
+ public BaseKeyParser duplicate() {
+ return new BaseKeyParser();
+ }
+
+ public String getRow() {
+ return keyFields.get(ROW_FIELD);
+ }
+
+ public String getColumnFamily() {
+ return keyFields.get(COLUMN_FAMILY_FIELD);
+ }
+
+ public String getColumnQualifier() {
+ return keyFields.get(COLUMN_QUALIFIER_FIELD);
+ }
+
+ public Key getKey() {
+ return this.key;
+ }
+
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java Mon Dec 5 20:05:49 2011
@@ -1,78 +1,71 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package util;
import org.apache.accumulo.core.data.Key;
public class FieldIndexKeyParser extends KeyParser {
-
- public static final String DELIMITER = "\0";
- @Override
- public void parse(Key key)
- {
- super.parse (key);
-
- String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER);
- this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : "");
-
- String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER);
- this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : "");
- this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : "");
- this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : "");
- }
-
- @Override
- public BaseKeyParser duplicate ()
- {
- return new FieldIndexKeyParser();
- }
-
- @Override
- public String getSelector()
- {
- return keyFields.get(SELECTOR_FIELD);
- }
-
- @Override
- public String getDataType()
- {
- return keyFields.get(DATATYPE_FIELD);
- }
-
- @Override
- public String getFieldName ()
- {
- return keyFields.get(FIELDNAME_FIELD);
- }
-
- @Override
- public String getUid()
- {
- return keyFields.get(UID_FIELD);
- }
-
- public String getDataTypeUid()
- {
- return getDataType()+DELIMITER+getUid();
- }
-
- // An alias for getSelector
- public String getFieldValue()
- {
- return getSelector();
- }
+
+ public static final String DELIMITER = "\0";
+
+ @Override
+ public void parse(Key key) {
+ super.parse(key);
+
+ String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER);
+ this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : "");
+
+ String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER);
+ this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : "");
+ this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : "");
+ this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : "");
+ }
+
+ @Override
+ public BaseKeyParser duplicate() {
+ return new FieldIndexKeyParser();
+ }
+
+ @Override
+ public String getSelector() {
+ return keyFields.get(SELECTOR_FIELD);
+ }
+
+ @Override
+ public String getDataType() {
+ return keyFields.get(DATATYPE_FIELD);
+ }
+
+ @Override
+ public String getFieldName() {
+ return keyFields.get(FIELDNAME_FIELD);
+ }
+
+ @Override
+ public String getUid() {
+ return keyFields.get(UID_FIELD);
+ }
+
+ public String getDataTypeUid() {
+ return getDataType() + DELIMITER + getUid();
+ }
+
+ // An alias for getSelector
+ public String getFieldValue() {
+ return getSelector();
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java Mon Dec 5 20:05:49 2011
@@ -1,79 +1,70 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package util;
import org.apache.accumulo.core.data.Key;
-public class KeyParser extends BaseKeyParser
-{
- public static final String SELECTOR_FIELD = "selector";
- public static final String DATATYPE_FIELD = "dataType";
- public static final String FIELDNAME_FIELD = "fieldName";
- public static final String UID_FIELD = "uid";
- public static final String DELIMITER = "\0";
+public class KeyParser extends BaseKeyParser {
+ public static final String SELECTOR_FIELD = "selector";
+ public static final String DATATYPE_FIELD = "dataType";
+ public static final String FIELDNAME_FIELD = "fieldName";
+ public static final String UID_FIELD = "uid";
+ public static final String DELIMITER = "\0";
+
+ @Override
+ public void parse(Key key) {
+ super.parse(key);
- @Override
- public void parse(Key key)
- {
- super.parse (key);
-
- String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER);
- this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : "");
-
- String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER);
- this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : "");
- this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : "");
- this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : "");
- }
-
- @Override
- public BaseKeyParser duplicate ()
- {
- return new KeyParser();
- }
-
- public String getSelector()
- {
- return keyFields.get(SELECTOR_FIELD);
- }
-
- public String getDataType()
- {
- return keyFields.get(DATATYPE_FIELD);
- }
-
- public String getFieldName ()
- {
- return keyFields.get(FIELDNAME_FIELD);
- }
+ String[] colFamParts = this.keyFields.get(BaseKeyParser.COLUMN_FAMILY_FIELD).split(DELIMITER);
+ this.keyFields.put(FIELDNAME_FIELD, colFamParts.length >= 2 ? colFamParts[1] : "");
- public String getUid()
- {
- return keyFields.get(UID_FIELD);
- }
-
- public String getDataTypeUid()
- {
- return getDataType()+DELIMITER+getUid();
- }
-
- // An alias for getSelector
- public String getFieldValue()
- {
- return getSelector();
- }
+ String[] colQualParts = this.keyFields.get(BaseKeyParser.COLUMN_QUALIFIER_FIELD).split(DELIMITER);
+ this.keyFields.put(SELECTOR_FIELD, colQualParts.length >= 1 ? colQualParts[0] : "");
+ this.keyFields.put(DATATYPE_FIELD, colQualParts.length >= 2 ? colQualParts[1] : "");
+ this.keyFields.put(UID_FIELD, colQualParts.length >= 3 ? colQualParts[2] : "");
+ }
+
+ @Override
+ public BaseKeyParser duplicate() {
+ return new KeyParser();
+ }
+
+ public String getSelector() {
+ return keyFields.get(SELECTOR_FIELD);
+ }
+
+ public String getDataType() {
+ return keyFields.get(DATATYPE_FIELD);
+ }
+
+ public String getFieldName() {
+ return keyFields.get(FIELDNAME_FIELD);
+ }
+
+ public String getUid() {
+ return keyFields.get(UID_FIELD);
+ }
+
+ public String getDataTypeUid() {
+ return getDataType() + DELIMITER + getUid();
+ }
+
+ // An alias for getSelector
+ public String getFieldValue() {
+ return getSelector();
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package logic;
import org.apache.hadoop.mapreduce.Counter;
@@ -21,50 +21,50 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.StatusReporter;
public class StandaloneStatusReporter extends StatusReporter {
-
- private Counters c = new Counters();
-
- private long filesProcessed = 0;
- private long recordsProcessed = 0;
-
- public Counters getCounters() {
- return c;
- }
-
- @Override
- public Counter getCounter(Enum<?> name) {
- return c.findCounter(name);
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- return c.findCounter(group, name);
- }
-
- @Override
- public void progress() {
- // do nothing
- }
-
- @Override
- public void setStatus(String status) {
- // do nothing
- }
-
- public long getFilesProcessed() {
- return filesProcessed;
- }
-
- public long getRecordsProcessed() {
- return recordsProcessed;
- }
-
- public void incrementFilesProcessed() {
- filesProcessed++;
- recordsProcessed = 0;
- }
-
- public void incrementRecordsProcessed() {
- recordsProcessed++;
- }
+
+ private Counters c = new Counters();
+
+ private long filesProcessed = 0;
+ private long recordsProcessed = 0;
+
+ public Counters getCounters() {
+ return c;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return c.findCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return c.findCounter(group, name);
+ }
+
+ @Override
+ public void progress() {
+ // do nothing
+ }
+
+ @Override
+ public void setStatus(String status) {
+ // do nothing
+ }
+
+ public long getFilesProcessed() {
+ return filesProcessed;
+ }
+
+ public long getRecordsProcessed() {
+ return recordsProcessed;
+ }
+
+ public void incrementFilesProcessed() {
+ filesProcessed++;
+ recordsProcessed = 0;
+ }
+
+ public void incrementRecordsProcessed() {
+ recordsProcessed++;
+ }
}
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff
==============================================================================
--- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java (original)
+++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java Mon Dec 5 20:05:49 2011
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package logic;
import ingest.WikipediaConfiguration;
@@ -64,130 +64,129 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.Authorizations;
public class TestQueryLogic {
-
- private static final String METADATA_TABLE_NAME = "wikiMetadata";
-
- private static final String TABLE_NAME = "wiki";
-
- private static final String INDEX_TABLE_NAME = "wikiIndex";
-
- private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
-
- private class MockAccumuloRecordWriter extends RecordWriter<Text, Mutation> {
- @Override
- public void write(Text key, Mutation value) throws IOException, InterruptedException {
- try {
- writerMap.get(key).addMutation(value);
- } catch (MutationsRejectedException e) {
- throw new IOException("Error adding mutation", e);
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- try {
- for (BatchWriter w : writerMap.values()) {
- w.flush();
- w.close();
- }
- } catch (MutationsRejectedException e) {
- throw new IOException("Error closing Batch Writer", e);
- }
- }
-
- }
-
- private Connector c = null;
- private Configuration conf = new Configuration();
- private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
- private QueryLogic table = null;
-
- @Before
- public void setup() throws Exception {
-
- Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG);
- Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG);
- Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG);
-
- conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
- conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
- conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
- conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
-
-
- MockInstance i = new MockInstance();
- c = i.getConnector("root", "pass");
- c.tableOperations().delete(METADATA_TABLE_NAME);
- c.tableOperations().delete(TABLE_NAME);
- c.tableOperations().delete(INDEX_TABLE_NAME);
- c.tableOperations().delete(RINDEX_TABLE_NAME);
- c.tableOperations().create(METADATA_TABLE_NAME);
- c.tableOperations().create(TABLE_NAME);
- c.tableOperations().create(INDEX_TABLE_NAME);
- c.tableOperations().create(RINDEX_TABLE_NAME);
-
- writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1));
- writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1));
- writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1));
- writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1));
-
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext context = new TaskAttemptContext(conf, id);
-
- RawLocalFileSystem fs = new RawLocalFileSystem();
- fs.setConf(conf);
-
- URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
- Assert.assertNotNull(url);
- File data = new File(url.toURI());
- Path tmpFile = new Path(data.getAbsolutePath());
-
- //Setup the Mapper
- InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
- AggregatingRecordReader rr = new AggregatingRecordReader();
- Path ocPath = new Path(tmpFile, "oc");
- OutputCommitter oc = new FileOutputCommitter(ocPath, context);
- fs.deleteOnExit(ocPath);
- StandaloneStatusReporter sr = new StandaloneStatusReporter();
- rr.initialize(split, context);
- MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
- WikipediaMapper mapper = new WikipediaMapper();
-
- //Load data into Mock Accumulo
- Mapper<LongWritable, Text, Text, Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split);
- mapper.run(con);
-
- //Flush and close record writers.
- rw.close(context);
-
- table = new QueryLogic();
- table.setMetadataTableName(METADATA_TABLE_NAME);
- table.setTableName(TABLE_NAME);
- table.setIndexTableName(INDEX_TABLE_NAME);
- table.setReverseIndexTableName(RINDEX_TABLE_NAME);
- table.setUseReadAheadIterator(false);
- table.setNumPartitions(1);
-
- }
-
- private void debugQuery(String tableName) throws Exception {
- Scanner s = c.createScanner(tableName, new Authorizations());
- Range r = new Range();
- s.setRange(r);
- for (Entry<Key,Value> entry : s)
- System.out.println(entry.getKey().toString() +" " + entry.getValue().toString());
- }
-
- @Test
- public void testTitle() {
- List<String> auths = new ArrayList<String>();
- auths.add("enwiki");
- Results results = table.runQuery(c, auths, "TITLE == 'afghanistanhistory'", null, null, null);
- for (Document doc : results.getResults()) {
- System.out.println("id: " + doc.getId());
- for (Field field : doc.getFields())
- System.out.println(field.getFieldName() + " -> " + field.getFieldValue());
- }
- }
-
+
+ private static final String METADATA_TABLE_NAME = "wikiMetadata";
+
+ private static final String TABLE_NAME = "wiki";
+
+ private static final String INDEX_TABLE_NAME = "wikiIndex";
+
+ private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
+
+ private class MockAccumuloRecordWriter extends RecordWriter<Text,Mutation> {
+ @Override
+ public void write(Text key, Mutation value) throws IOException, InterruptedException {
+ try {
+ writerMap.get(key).addMutation(value);
+ } catch (MutationsRejectedException e) {
+ throw new IOException("Error adding mutation", e);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ for (BatchWriter w : writerMap.values()) {
+ w.flush();
+ w.close();
+ }
+ } catch (MutationsRejectedException e) {
+ throw new IOException("Error closing Batch Writer", e);
+ }
+ }
+
+ }
+
+ private Connector c = null;
+ private Configuration conf = new Configuration();
+ private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
+ private QueryLogic table = null;
+
+ @Before
+ public void setup() throws Exception {
+
+ Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG);
+ Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG);
+ Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG);
+
+ conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+ conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+ conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
+ conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+
+ MockInstance i = new MockInstance();
+ c = i.getConnector("root", "pass");
+ c.tableOperations().delete(METADATA_TABLE_NAME);
+ c.tableOperations().delete(TABLE_NAME);
+ c.tableOperations().delete(INDEX_TABLE_NAME);
+ c.tableOperations().delete(RINDEX_TABLE_NAME);
+ c.tableOperations().create(METADATA_TABLE_NAME);
+ c.tableOperations().create(TABLE_NAME);
+ c.tableOperations().create(INDEX_TABLE_NAME);
+ c.tableOperations().create(RINDEX_TABLE_NAME);
+
+ writerMap.put(new Text(METADATA_TABLE_NAME), c.createBatchWriter(METADATA_TABLE_NAME, 1000L, 1000L, 1));
+ writerMap.put(new Text(TABLE_NAME), c.createBatchWriter(TABLE_NAME, 1000L, 1000L, 1));
+ writerMap.put(new Text(INDEX_TABLE_NAME), c.createBatchWriter(INDEX_TABLE_NAME, 1000L, 1000L, 1));
+ writerMap.put(new Text(RINDEX_TABLE_NAME), c.createBatchWriter(RINDEX_TABLE_NAME, 1000L, 1000L, 1));
+
+ TaskAttemptID id = new TaskAttemptID();
+ TaskAttemptContext context = new TaskAttemptContext(conf, id);
+
+ RawLocalFileSystem fs = new RawLocalFileSystem();
+ fs.setConf(conf);
+
+ URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
+ Assert.assertNotNull(url);
+ File data = new File(url.toURI());
+ Path tmpFile = new Path(data.getAbsolutePath());
+
+ // Setup the Mapper
+ InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
+ AggregatingRecordReader rr = new AggregatingRecordReader();
+ Path ocPath = new Path(tmpFile, "oc");
+ OutputCommitter oc = new FileOutputCommitter(ocPath, context);
+ fs.deleteOnExit(ocPath);
+ StandaloneStatusReporter sr = new StandaloneStatusReporter();
+ rr.initialize(split, context);
+ MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
+ WikipediaMapper mapper = new WikipediaMapper();
+
+ // Load data into Mock Accumulo
+ Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split);
+ mapper.run(con);
+
+ // Flush and close record writers.
+ rw.close(context);
+
+ table = new QueryLogic();
+ table.setMetadataTableName(METADATA_TABLE_NAME);
+ table.setTableName(TABLE_NAME);
+ table.setIndexTableName(INDEX_TABLE_NAME);
+ table.setReverseIndexTableName(RINDEX_TABLE_NAME);
+ table.setUseReadAheadIterator(false);
+ table.setNumPartitions(1);
+
+ }
+
+ private void debugQuery(String tableName) throws Exception {
+ Scanner s = c.createScanner(tableName, new Authorizations());
+ Range r = new Range();
+ s.setRange(r);
+ for (Entry<Key,Value> entry : s)
+ System.out.println(entry.getKey().toString() + " " + entry.getValue().toString());
+ }
+
+ @Test
+ public void testTitle() {
+ List<String> auths = new ArrayList<String>();
+ auths.add("enwiki");
+ Results results = table.runQuery(c, auths, "TITLE == 'afghanistanhistory'", null, null, null);
+ for (Document doc : results.getResults()) {
+ System.out.println("id: " + doc.getId());
+ for (Field field : doc.getFields())
+ System.out.println(field.getFieldName() + " -> " + field.getFieldValue());
+ }
+ }
+
}