You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/12 17:06:20 UTC
svn commit: r1230608 [4/16] - in /incubator/accumulo/trunk: ./
contrib/accumulo_sample/ src/assemble/ src/core/
src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/
src/core/src/main/java/org/apache/accumulo/core/master/thrift/ src/core/...
Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.shard;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * The program reads an accumulo table written by Index.java and writes out to another table. It writes out a mapping of documents to terms. The document to term
+ * mapping is used by ContinuousQuery.java
+ *
+ */
+
+public class Reverse {
+ public static void main(String[] args) throws Exception {
+
+ if (args.length != 6) {
+ System.err.println("Usage : " + Reverse.class.getName() + " <instance> <zoo keepers> <shard table> <doc2word table> <user> <pass>");
+ System.exit(-1);
+ }
+
+ String instance = args[0];
+ String zooKeepers = args[1];
+ String inTable = args[2];
+ String outTable = args[3];
+ String user = args[4];
+ String pass = args[5];
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zooKeepers);
+ Connector conn = zki.getConnector(user, pass.getBytes());
+
+ Scanner scanner = conn.createScanner(inTable, Constants.NO_AUTHS);
+ BatchWriter bw = conn.createBatchWriter(outTable, 50000000, 600000l, 4);
+
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ Mutation m = new Mutation(key.getColumnQualifier());
+ m.put(key.getColumnFamily(), new Text(), new Value(new byte[0]));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.examples.simple.dirlist.FileCount;
+import org.apache.accumulo.examples.simple.dirlist.Ingest;
+import org.apache.accumulo.examples.simple.dirlist.QueryUtil;
+import org.apache.hadoop.io.Text;
+
+public class CountTest extends TestCase {
+ {
+ try {
+ Connector conn = new MockInstance("counttest").getConnector("root", "".getBytes());
+ conn.tableOperations().create("dirlisttable");
+ BatchWriter bw = conn.createBatchWriter("dirlisttable", 1000000l, 100l, 1);
+ ColumnVisibility cv = new ColumnVisibility();
+ // / has 1 dir
+ // /local has 2 dirs 1 file
+ // /local/user1 has 2 files
+ bw.addMutation(Ingest.buildMutation(cv, "/local", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user2", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 23456, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file1", false, false, false, 2024, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file2", false, false, false, 1028, 23456, null));
+ bw.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void test() throws Exception {
+ Scanner scanner = new MockInstance("counttest").getConnector("root", "".getBytes()).createScanner("dirlisttable", new Authorizations());
+ scanner.fetchColumn(new Text("dir"), new Text("counts"));
+ assertFalse(scanner.iterator().hasNext());
+
+ FileCount fc = new FileCount("counttest", null, "root", "", "dirlisttable", "", "", true);
+ fc.run();
+
+ ArrayList<Pair<String,String>> expected = new ArrayList<Pair<String,String>>();
+ expected.add(new Pair<String,String>(QueryUtil.getRow("").toString(), "1,0,3,3"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local").toString(), "2,1,2,3"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user1").toString(), "0,2,0,2"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user2").toString(), "0,0,0,0"));
+
+ int i = 0;
+ for (Entry<Key,Value> e : scanner) {
+ assertEquals(e.getKey().getRow().toString(), expected.get(i).getFirst());
+ assertEquals(e.getValue().toString(), expected.get(i).getSecond());
+ i++;
+ }
+ assertEquals(i, expected.size());
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkCombiner;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+
+import junit.framework.TestCase;
+
+public class ChunkCombinerTest extends TestCase {
+
+ public static class MapIterator implements SortedKeyValueIterator<Key,Value> {
+ private Iterator<Entry<Key,Value>> iter;
+ private Entry<Key,Value> entry;
+ Collection<ByteSequence> columnFamilies;
+ private SortedMap<Key,Value> map;
+ private Range range;
+
+ public MapIterator deepCopy(IteratorEnvironment env) {
+ return new MapIterator(map);
+ }
+
+ private MapIterator(SortedMap<Key,Value> map) {
+ this.map = map;
+ iter = map.entrySet().iterator();
+ this.range = new Range();
+ if (iter.hasNext())
+ entry = iter.next();
+ else
+ entry = null;
+ }
+
+ @Override
+ public Key getTopKey() {
+ return entry.getKey();
+ }
+
+ @Override
+ public Value getTopValue() {
+ return entry.getValue();
+ }
+
+ @Override
+ public boolean hasTop() {
+ return entry != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ entry = null;
+ while (iter.hasNext()) {
+ entry = iter.next();
+ if (columnFamilies.size() > 0 && !columnFamilies.contains(entry.getKey().getColumnFamilyData())) {
+ entry = null;
+ continue;
+ }
+ if (range.afterEndKey((Key) entry.getKey()))
+ entry = null;
+ break;
+ }
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (!inclusive) {
+ throw new IllegalArgumentException("can only do inclusive colf filtering");
+ }
+ this.columnFamilies = columnFamilies;
+ this.range = range;
+
+ Key key = range.getStartKey();
+ if (key == null) {
+ key = new Key();
+ }
+
+ iter = map.tailMap(key).entrySet().iterator();
+ next();
+ while (hasTop() && range.beforeStartKey(getTopKey())) {
+ next();
+ }
+ }
+
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private TreeMap<Key,Value> row1;
+ private TreeMap<Key,Value> row2;
+ private TreeMap<Key,Value> row3;
+ private TreeMap<Key,Value> allRows;
+
+ private TreeMap<Key,Value> cRow1;
+ private TreeMap<Key,Value> cRow2;
+ private TreeMap<Key,Value> cRow3;
+ private TreeMap<Key,Value> allCRows;
+
+ private TreeMap<Key,Value> cOnlyRow1;
+ private TreeMap<Key,Value> cOnlyRow2;
+ private TreeMap<Key,Value> cOnlyRow3;
+ private TreeMap<Key,Value> allCOnlyRows;
+
+ private TreeMap<Key,Value> badrow;
+
+ @Override
+ protected void setUp() {
+ row1 = new TreeMap<Key,Value>();
+ row2 = new TreeMap<Key,Value>();
+ row3 = new TreeMap<Key,Value>();
+ allRows = new TreeMap<Key,Value>();
+
+ cRow1 = new TreeMap<Key,Value>();
+ cRow2 = new TreeMap<Key,Value>();
+ cRow3 = new TreeMap<Key,Value>();
+ allCRows = new TreeMap<Key,Value>();
+
+ cOnlyRow1 = new TreeMap<Key,Value>();
+ cOnlyRow2 = new TreeMap<Key,Value>();
+ cOnlyRow3 = new TreeMap<Key,Value>();
+ allCOnlyRows = new TreeMap<Key,Value>();
+
+ badrow = new TreeMap<Key,Value>();
+
+ String refs = FileDataIngest.REFS_CF.toString();
+ String fileext = FileDataIngest.REFS_FILE_EXT;
+ String filename = FileDataIngest.REFS_ORIG_FILE;
+ String chunk_cf = FileDataIngest.CHUNK_CF.toString();
+
+ row1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+ row1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+ row1.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+ row1.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V1".getBytes()));
+ row1.put(new Key("row1", chunk_cf, "0001", "A"), new Value("V2".getBytes()));
+ row1.put(new Key("row1", chunk_cf, "0001", "B"), new Value("V2".getBytes()));
+
+ cRow1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+ cRow1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+ cRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+ cRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+
+ cOnlyRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+ cOnlyRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+
+ row2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+ row2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+ row2.put(new Key("row2", chunk_cf, "0000", "A|B"), new Value("V1".getBytes()));
+ row2.put(new Key("row2", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+ row2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+ row2.put(new Key("row2a", chunk_cf, "0000", "C"), new Value("V1".getBytes()));
+
+ cRow2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+ cRow2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+ cRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+
+ cOnlyRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+
+ row3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+ row3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+ row3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+ row3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+ row3.put(new Key("row3", chunk_cf, "0000", "(A&B)|(C&(D|E))", 10), new Value("V1".getBytes()));
+ row3.put(new Key("row3", chunk_cf, "0000", "A&B", 20), new Value("V1".getBytes()));
+ row3.put(new Key("row3", chunk_cf, "0000", "(A&B)", 10), new Value("V1".getBytes()));
+ row3.put(new Key("row3", chunk_cf, "0000", "(F|G)&(D|E)", 10), new Value("V1".getBytes()));
+
+ cRow3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+ cRow3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+ cRow3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+ cRow3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+ cRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+
+ cOnlyRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+
+ badrow.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+ badrow.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V2".getBytes()));
+
+ allRows.putAll(row1);
+ allRows.putAll(row2);
+ allRows.putAll(row3);
+
+ allCRows.putAll(cRow1);
+ allCRows.putAll(cRow2);
+ allCRows.putAll(cRow3);
+
+ allCOnlyRows.putAll(cOnlyRow1);
+ allCOnlyRows.putAll(cOnlyRow2);
+ allCOnlyRows.putAll(cOnlyRow3);
+ }
+
+ private static final Collection<ByteSequence> emptyColfs = new HashSet<ByteSequence>();
+
+ public void test1() throws IOException {
+ runTest(false, allRows, allCRows, emptyColfs);
+ runTest(true, allRows, allCRows, emptyColfs);
+ runTest(false, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+ runTest(true, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+
+ try {
+ runTest(true, badrow, null, emptyColfs);
+ assertNotNull(null);
+ } catch (RuntimeException e) {
+ assertNull(null);
+ }
+ }
+
+ private void runTest(boolean reseek, TreeMap<Key,Value> source, TreeMap<Key,Value> result, Collection<ByteSequence> cols) throws IOException {
+ MapIterator src = new MapIterator(source);
+ SortedKeyValueIterator<Key,Value> iter = new ChunkCombiner();
+ iter.init(src, null, null);
+ iter = iter.deepCopy(null);
+ iter.seek(new Range(), cols, true);
+
+ TreeMap<Key,Value> seen = new TreeMap<Key,Value>();
+
+ while (iter.hasTop()) {
+ assertFalse("already contains " + iter.getTopKey(), seen.containsKey(iter.getTopKey()));
+ seen.put(new Key(iter.getTopKey()), new Value(iter.getTopValue()));
+
+ if (reseek)
+ iter.seek(new Range(iter.getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL), true, null, true), cols, true);
+ else
+ iter.next();
+ }
+
+ assertEquals(result, seen);
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputFormat;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.log4j.Logger;
+
+public class ChunkInputFormatTest extends TestCase {
+ private static final Logger log = Logger.getLogger(ChunkInputStream.class);
+ List<Entry<Key,Value>> data;
+ List<Entry<Key,Value>> baddata;
+
+ {
+ data = new ArrayList<Entry<Key,Value>>();
+ ChunkInputStreamTest.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamTest.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+ ChunkInputStreamTest.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ ChunkInputStreamTest.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+ ChunkInputStreamTest.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamTest.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+ ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+ ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+ ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+ ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+ ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "D", "");
+ baddata = new ArrayList<Entry<Key,Value>>();
+ ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+ }
+
+ public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+ assertEquals(e1.getKey(), e2.getKey());
+ assertEquals(e1.getValue(), e2.getValue());
+ }
+
+ public void test() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+ MockInstance instance = new MockInstance("instance1");
+ Connector conn = instance.getConnector("root", "".getBytes());
+ conn.tableOperations().create("test");
+ BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ JobContext job = new JobContext(new Configuration(), new JobID());
+ ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+ ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1");
+ ChunkInputFormat cif = new ChunkInputFormat();
+ RangeInputSplit ris = new RangeInputSplit();
+ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac);
+ rr.initialize(ris, tac);
+
+ assertTrue(rr.nextKeyValue());
+ List<Entry<Key,Value>> info = rr.getCurrentKey();
+ InputStream cis = rr.getCurrentValue();
+ byte[] b = new byte[20];
+ int read;
+ assertEquals(info.size(), 2);
+ entryEquals(info.get(0), data.get(0));
+ entryEquals(info.get(1), data.get(1));
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertTrue(rr.nextKeyValue());
+ info = rr.getCurrentKey();
+ cis = rr.getCurrentValue();
+ assertEquals(info.size(), 2);
+ entryEquals(info.get(0), data.get(4));
+ entryEquals(info.get(1), data.get(5));
+ assertEquals(read = cis.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertFalse(rr.nextKeyValue());
+ }
+
+ public void testErrorOnNextWithoutClose() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
+ TableExistsException {
+ MockInstance instance = new MockInstance("instance2");
+ Connector conn = instance.getConnector("root", "".getBytes());
+ conn.tableOperations().create("test");
+ BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ JobContext job = new JobContext(new Configuration(), new JobID());
+ ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+ ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2");
+ ChunkInputFormat cif = new ChunkInputFormat();
+ RangeInputSplit ris = new RangeInputSplit();
+ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
+ crr.initialize(ris, tac);
+
+ assertTrue(crr.nextKeyValue());
+ InputStream cis = crr.getCurrentValue();
+ byte[] b = new byte[5];
+ int read;
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "asdfj");
+
+ try {
+ crr.nextKeyValue();
+ assertNotNull(null);
+ } catch (Exception e) {
+ log.debug("EXCEPTION " + e.getMessage());
+ assertNull(null);
+ }
+ }
+
+ public void testInfoWithoutChunks() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
+ TableExistsException {
+ MockInstance instance = new MockInstance("instance3");
+ Connector conn = instance.getConnector("root", "".getBytes());
+ conn.tableOperations().create("test");
+ BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+ for (Entry<Key,Value> e : baddata) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ JobContext job = new JobContext(new Configuration(), new JobID());
+ ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+ ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3");
+ ChunkInputFormat cif = new ChunkInputFormat();
+ RangeInputSplit ris = new RangeInputSplit();
+ TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
+ crr.initialize(ris, tac);
+
+ assertTrue(crr.nextKeyValue());
+ List<Entry<Key,Value>> info = crr.getCurrentKey();
+ InputStream cis = crr.getCurrentValue();
+ byte[] b = new byte[20];
+ assertEquals(info.size(), 2);
+ entryEquals(info.get(0), baddata.get(0));
+ entryEquals(info.get(1), baddata.get(1));
+ try {
+ cis.read(b);
+ assertNotNull(null);
+ } catch (Exception e) {
+ log.debug("EXCEPTION " + e.getMessage());
+ assertNull(null);
+ }
+ try {
+ cis.close();
+ assertNotNull(null);
+ } catch (Exception e) {
+ log.debug("EXCEPTION " + e.getMessage());
+ assertNull(null);
+ }
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class ChunkInputStreamTest extends TestCase {
+ private static final Logger log = Logger.getLogger(ChunkInputStream.class);
+ List<Entry<Key,Value>> data;
+ List<Entry<Key,Value>> baddata;
+ List<Entry<Key,Value>> multidata;
+
+ {
+ data = new ArrayList<Entry<Key,Value>>();
+ addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+ addData(data, "a", "refs", "id\0name", "A&B", "name");
+ addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(data, "a", "~chunk", 100, 1, "A&B", "");
+ addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+ addData(data, "b", "refs", "id\0name", "A&B", "name");
+ addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+ addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+ addData(data, "b", "~chunk", 100, 1, "A&B", "");
+ addData(data, "b", "~chunk", 100, 1, "B&C", "");
+ addData(data, "b", "~chunk", 100, 1, "D", "");
+ addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+ addData(data, "c", "~chunk", 100, 2, "A&B", "");
+ baddata = new ArrayList<Entry<Key,Value>>();
+ addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+ addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+ addData(baddata, "b", "~chunk", 100, 2, "C", "");
+ addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+ addData(baddata, "c", "~chunk", 100, 2, "E", "");
+ addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+ addData(baddata, "d", "~chunk", 100, 1, "G", "");
+ addData(baddata, "d", "~zzzzz", "colq", "H", "");
+ addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+ addData(baddata, "e", "~chunk", 100, 1, "J", "");
+ addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+ multidata = new ArrayList<Entry<Key,Value>>();
+ addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+ addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+ addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+ }
+
+ public static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+ data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+ }
+
+ public static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+ Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+ chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+ data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+ }
+
+ public void testExceptionOnMultipleSetSourceWithoutClose() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+ pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+ cis.setSource(pi);
+ try {
+ cis.setSource(pi);
+ assertNotNull(null);
+ } catch (RuntimeException e) {
+ assertNull(null);
+ }
+ cis.close();
+ }
+
+ public void testExceptionOnGetVisBeforeClose() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+
+ cis.setSource(pi);
+ try {
+ cis.getVisibilities();
+ assertNotNull(null);
+ } catch (RuntimeException e) {
+ assertNull(null);
+ }
+ cis.close();
+ cis.getVisibilities();
+ }
+
+ public void testReadIntoBufferSmallerThanChunks() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[5];
+
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+
+ cis.setSource(pi);
+ int read;
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "asdfj");
+ assertEquals(read = cis.read(b), 3);
+ assertEquals(new String(b, 0, read), "kl;");
+ assertEquals(read = cis.read(b), -1);
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "qwert");
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "yuiop");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "asdfj");
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "kl;as");
+ assertEquals(read = cis.read(b), 5);
+ assertEquals(new String(b, 0, read), "dfjkl");
+ assertEquals(read = cis.read(b), 1);
+ assertEquals(new String(b, 0, read), ";");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertFalse(pi.hasNext());
+ }
+
+ public void testReadIntoBufferLargerThanChunks() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 16);
+ assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertFalse(pi.hasNext());
+ }
+
+ public void testWithAccumulo() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException {
+ Connector conn = new MockInstance().getConnector("root", "".getBytes());
+ conn.tableOperations().create("test");
+ BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ Scanner scan = conn.createScanner("test", new Authorizations("A", "B", "C", "D"));
+
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(scan.iterator());
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 16);
+ assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertFalse(pi.hasNext());
+ }
+
+ private static void assumeExceptionOnRead(ChunkInputStream cis, byte[] b) {
+ try {
+ cis.read(b);
+ assertNotNull(null);
+ } catch (IOException e) {
+ log.debug("EXCEPTION " + e.getMessage());
+ assertNull(null);
+ }
+ }
+
+ private static void assumeExceptionOnClose(ChunkInputStream cis) {
+ try {
+ cis.close();
+ assertNotNull(null);
+ } catch (IOException e) {
+ log.debug("EXCEPTION " + e.getMessage());
+ assertNull(null);
+ }
+ }
+
+ public void testBadData() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assumeExceptionOnClose(cis);
+ // can still get visibilities after exception -- bad?
+ assertEquals(cis.getVisibilities().toString(), "[A]");
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assumeExceptionOnClose(cis);
+ assertEquals(cis.getVisibilities().toString(), "[B, C]");
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assumeExceptionOnClose(cis);
+ assertEquals(cis.getVisibilities().toString(), "[D, E]");
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[F, G]");
+ cis.close();
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ cis.close();
+ assertEquals(cis.getVisibilities().toString(), "[I, J]");
+
+ assertFalse(pi.hasNext());
+
+ pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+ cis.setSource(pi);
+ assumeExceptionOnClose(cis);
+ }
+
+ public void testBadDataWithoutClosing() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ // can still get visibilities after exception -- bad?
+ assertEquals(cis.getVisibilities().toString(), "[A]");
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assertEquals(cis.getVisibilities().toString(), "[B, C]");
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assertEquals(cis.getVisibilities().toString(), "[D, E]");
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[F, G]");
+ cis.close();
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assertEquals(cis.getVisibilities().toString(), "[I, J]");
+
+ assertFalse(pi.hasNext());
+
+ pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+ cis.setSource(pi);
+ assumeExceptionOnClose(cis);
+ }
+
+ public void testMultipleChunkSizes() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(multidata.iterator());
+
+ b = new byte[20];
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+
+ cis.setSource(pi);
+ assumeExceptionOnRead(cis, b);
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+ assertEquals(cis.getVisibilities().toString(), "[A&B, B&C]");
+
+ assertFalse(pi.hasNext());
+ }
+
+ public void testSingleByteRead() throws IOException {
+ ChunkInputStream cis = new ChunkInputStream();
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+
+ cis.setSource(pi);
+ assertEquals((byte) 'a', (byte) cis.read());
+ assertEquals((byte) 's', (byte) cis.read());
+ assertEquals((byte) 'd', (byte) cis.read());
+ assertEquals((byte) 'f', (byte) cis.read());
+ assertEquals((byte) 'j', (byte) cis.read());
+ assertEquals((byte) 'k', (byte) cis.read());
+ assertEquals((byte) 'l', (byte) cis.read());
+ assertEquals((byte) ';', (byte) cis.read());
+ assertEquals(cis.read(), -1);
+ cis.close();
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.examples.simple.filedata.KeyUtil;
+import org.apache.hadoop.io.Text;
+
+public class KeyUtilTest extends TestCase {
+ public static void checkSeps(String... s) {
+ Text t = KeyUtil.buildNullSepText(s);
+ String[] rets = KeyUtil.splitNullSepText(t);
+
+ int length = 0;
+ for (String str : s)
+ length += str.length();
+ assertEquals(t.getLength(), length + s.length - 1);
+ assertEquals(rets.length, s.length);
+ for (int i = 0; i < s.length; i++)
+ assertEquals(s[i], rets[i]);
+ }
+
+ public void testNullSep() {
+ checkSeps("abc", "d", "", "efgh");
+ checkSeps("ab", "");
+ checkSeps("abcde");
+ checkSeps("");
+ checkSeps("", "");
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Jan 12 16:06:14 2012
@@ -0,0 +1,2 @@
+.*
+target
Added: incubator/accumulo/trunk/src/examples/wikisearch/README
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/README?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/README (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/README Thu Jan 12 16:06:14 2012
@@ -0,0 +1,65 @@
+
+ This project contains a sample application for ingesting and querying wikipedia data.
+
+
+ Ingest
+ ------
+
+ Prerequisites
+ -------------
+ 1. Accumulo, Hadoop, and ZooKeeper must be installed and running
+ 2. ACCUMULO_HOME and ZOOKEEPER_HOME must be defined in the environment
+ 3. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
+ You will want to grab the files with the link name of pages-articles.xml.bz2
+
+
+ INSTRUCTIONS
+ ------------
+ 1. Copy the ingest/conf/wikipedia.xml.example to ingest/conf/wikipedia.xml and change it to specify Accumulo information.
+ 2. Copy the ingest/lib/wikisearch-*.jar and ingest/lib/protobuf*.jar to $ACCUMULO_HOME/lib/ext
+ 3. Then run ingest/bin/ingest.sh with one argument (the name of the directory in HDFS where the wikipedia XML
+ files reside) and this will kick off a MapReduce job to ingest the data into Accumulo.
+
+ Query
+ -----
+
+ Prerequisites
+ -------------
+ 1. The query software was tested using JBoss AS 6. Install this unless you feel like messing with the installation.
+
+ NOTE: Ran into a bug (https://issues.jboss.org/browse/RESTEASY-531) that did not allow an EJB3.1 war file. The
+ workaround is to separate the RESTEasy servlet from the EJBs by creating an EJB jar and a WAR file.
+
+ INSTRUCTIONS
+ -------------
+ 1. Copy the query/src/main/resources/META-INF/ejb-jar.xml.example file to
+ query/src/main/resources/META-INF/ejb-jar.xml. Modify to the file to contain the same
+ information that you put into the wikipedia.xml file from the Ingest step above.
+ 2. Re-build the query distribution by running 'mvn package assembly:single' in the top-level directory.
+ 3. Untar the resulting file in the $JBOSS_HOME/server/default directory.
+
+ $ cd $JBOSS_HOME/server/default
+ $ tar -xzf $ACCUMULO_HOME/src/examples/wikisearch/query/target/wikisearch-query*.tar.gz
+
+ This will place the dependent jars in the lib directory and the EJB jar into the deploy directory.
+ 4. Next, copy the wikisearch*.war file in the query-war/target directory to $JBOSS_HOME/server/default/deploy.
+ 5. Start JBoss ($JBOSS_HOME/bin/run.sh)
+ 6. Use the Accumulo shell and give the user permissions for the wikis that you loaded, for example:
+ setauths -u <user> -s all,enwiki,eswiki,frwiki,fawiki
+ 7. Copy the following jars to the $ACCUMULO_HOME/lib/ext directory from the $JBOSS_HOME/server/default/lib directory:
+
+ commons-lang*.jar
+ kryo*.jar
+ minlog*.jar
+ commons-jexl*.jar
+ google-collections*.jar
+
+ 8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to $ACCUMULO_HOME/lib/ext.
+
+
+ 9. At this point you should be able to open a browser and view the page: http://localhost:8080/accumulo-wikisearch/ui/ui.jsp.
+ You can issue the queries using this user interface or via the following REST urls: <host>/accumulo-wikisearch/rest/Query/xml,
+ <host>/accumulo-wikisearch/rest/Query/html, <host>/accumulo-wikisearch/rest/Query/yaml, or <host>/accumulo-wikisearch/rest/Query/json.
+ There are two parameters to the REST service, query and auths. The query parameter is the same string that you would type
+ into the search box at ui.jsp, and the auths parameter is a comma-separated list of wikis that you want to search (i.e.
+ enwiki,frwiki,dewiki, etc. Or you can use all)
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/README
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Jan 12 16:06:14 2012
@@ -0,0 +1,3 @@
+.*
+target
+lib
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh Thu Jan 12 16:06:14 2012
@@ -0,0 +1,74 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+THIS_SCRIPT="$0"
+SCRIPT_DIR="${THIS_SCRIPT%/*}"
+SCRIPT_DIR=`cd $SCRIPT_DIR ; pwd`
+echo $SCRIPT_DIR
+
+ACCUMULO_HOME=${ACCUMULO_HOME}
+ZOOKEEPER_HOME=${ZOOKEEPER_HOME}
+
+#
+# Check ZOOKEEPER_HOME
+#
+if [[ -z $ZOOKEEPER_HOME ]]; then
+ echo "You must set ZOOKEEPER_HOME environment variable"
+ exit -1;
+else
+ for f in $ZOOKEEPER_HOME/zookeeper-*.jar; do
+ CLASSPATH=$f
+ break
+ done
+fi
+
+#
+# Check ACCUMULO_HOME
+#
+if [[ -z $ACCUMULO_HOME ]]; then
+ echo "You must set ACCUMULO_HOME environment variable"
+ exit -1;
+else
+ for f in $ACCUMULO_HOME/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f
+ done
+fi
+
+#
+# Add our jars
+#
+for f in $SCRIPT_DIR/../lib/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f
+done
+
+#
+# Transform the classpath into a comma-separated list also
+#
+LIBJARS=`echo $CLASSPATH | sed 's/:/,/g'`
+
+
+#
+# Map/Reduce job
+#
+JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.4.0-incubating-SNAPSHOT.jar
+CONF=$SCRIPT_DIR/../conf/wikipedia.xml
+HDFS_DATA_DIR=$1
+export HADOOP_CLASSPATH=$CLASSPATH
+echo "hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}"
+hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example Thu Jan 12 16:06:14 2012
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+ <property>
+ <name>wikipedia.accumulo.zookeepers</name>
+ <value><!--zookeeper servers --></value>
+ </property>
+ <property>
+ <name>wikipedia.accumulo.instance_name</name>
+ <value><!--instance name --></value>
+ </property>
+ <property>
+ <name>wikipedia.accumulo.user</name>
+ <value><!--user name --></value>
+ </property>
+ <property>
+ <name>wikipedia.accumulo.password</name>
+ <value><!-- password --></value>
+ </property>
+ <property>
+ <name>wikipedia.accumulo.table</name>
+ <value><!--table name --></value>
+ </property>
+ <property>
+ <name>wikipedia.ingest.partitions</name>
+ <value><!--number of partitions --></value>
+ </property>
+</configuration>
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml Thu Jan 12 16:06:14 2012
@@ -0,0 +1,114 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>accumulo-wikisearch</artifactId>
+ <groupId>org.apache.accumulo</groupId>
+ <version>1.5.0-incubating-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>wikisearch-ingest</artifactId>
+ <name>wikisearch-ingest</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-start</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-analyzers</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-wikipedia</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>lib</outputDirectory>
+ <!-- just grab the non-provided runtime dependencies -->
+ <includeArtifactIds>commons-lang,google-collections,lucene-core,lucene-analyzers,lucene-wikipedia,protobuf-java</includeArtifactIds>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assembly/dist.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml Thu Jan 12 16:06:14 2012
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <id>dist</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <baseDirectory></baseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>lib</directory>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>bin</directory>
+ <fileMode>0744</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>conf</directory>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.aggregator;
+
+import java.util.HashSet;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the global index and global reverse index, where
+ * the list of UIDs for events will be maintained in the index for low cardinality terms (Low in this case being less than 20).
+ *
+ */
+public class GlobalIndexUidAggregator implements Aggregator {
+
+ private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
+ private Uid.List.Builder builder = Uid.List.newBuilder();
+ // Using a set instead of a list so that duplicate IDs are filtered out of the list.
+ private HashSet<String> uids = new HashSet<String>();
+ private boolean seenIgnore = false;
+ public static final int MAX = 20;
+ private long count = 0;
+
+ @Override
+ public Value aggregate() {
+ // Special case logic
+ // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
+ // However, always maintain the count
+ if (uids.size() > MAX || seenIgnore) {
+ builder.setCOUNT(count);
+ builder.setIGNORE(true);
+ builder.clearUID();
+ } else {
+ builder.setCOUNT(count);
+ builder.setIGNORE(false);
+ builder.addAllUID(uids);
+ }
+ return new Value(builder.build().toByteArray());
+ }
+
+ @Override
+ public void collect(Value value) {
+ if (null == value || value.get().length == 0)
+ return;
+ // Collect the values, which are serialized Uid.List objects
+ try {
+ Uid.List v = Uid.List.parseFrom(value.get());
+ count = count + v.getCOUNT();
+ if (v.getIGNORE()) {
+ seenIgnore = true;
+ }
+ // Add the incoming list to this list
+ uids.addAll(v.getUIDList());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Value passed to aggregator was not of type Uid.List", e);
+ }
+ }
+
+ @Override
+ public void reset() {
+ count = 0;
+ seenIgnore = false;
+ builder = Uid.List.newBuilder();
+ uids.clear();
+ }
+
+}
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.aggregator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
+import org.apache.accumulo.examples.wikisearch.protobuf.TermWeight;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * An Aggregator to merge together a list of term offsets and one normalized term frequency
+ *
+ */
+public class TextIndexAggregator implements Aggregator {
+ private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+
+ private List<Integer> offsets = new ArrayList<Integer>();
+ private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
+ private float normalizedTermFrequency = 0f;
+
+ @Override
+ public Value aggregate() {
+ // Keep the sorted order we tried to maintain
+ for (int i = 0; i < offsets.size(); ++i) {
+ builder.addWordOffset(offsets.get(i));
+ }
+
+ builder.setNormalizedTermFrequency(normalizedTermFrequency);
+
+ return new Value(builder.build().toByteArray());
+ }
+
+ @Override
+ public void collect(Value value) {
+ // Make sure we don't aggregate something else
+ if (value == null || value.get().length == 0) {
+ return;
+ }
+
+ TermWeight.Info info;
+
+ try {
+ info = TermWeight.Info.parseFrom(value.get());
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Value passed to aggregator was not of type TermWeight.Info", e);
+ return;
+ }
+
+ // Add each offset into the list maintaining sorted order
+ for (int offset : info.getWordOffsetList()) {
+ int pos = Collections.binarySearch(offsets, offset);
+
+ if (pos < 0) {
+ // Undo the transform on the insertion point
+ offsets.add((-1 * pos) - 1, offset);
+ } else {
+ offsets.add(pos, offset);
+ }
+ }
+
+ if (info.getNormalizedTermFrequency() > 0) {
+ this.normalizedTermFrequency += info.getNormalizedTermFrequency();
+ }
+ }
+
+ @Override
+ public void reset() {
+ this.offsets.clear();
+ this.normalizedTermFrequency = 0f;
+ this.builder = TermWeight.Info.newBuilder();
+ }
+
+}
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.io.Reader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.examples.wikisearch.normalizer.NumberNormalizer;
+
+
+public class ArticleExtractor {
+
+ public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
+ private static NumberNormalizer nn = new NumberNormalizer();
+ private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
+
+ public static class Article {
+ int id;
+ String title;
+ long timestamp;
+ String comments;
+ String text;
+
+ private Article(int id, String title, long timestamp, String comments, String text) {
+ super();
+ this.id = id;
+ this.title = title;
+ this.timestamp = timestamp;
+ this.comments = comments;
+ this.text = text;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public String getComments() {
+ return comments;
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public Map<String,Object> getFieldValues() {
+ Map<String,Object> fields = new HashMap<String,Object>();
+ fields.put("ID", this.id);
+ fields.put("TITLE", this.title);
+ fields.put("TIMESTAMP", this.timestamp);
+ fields.put("COMMENTS", this.comments);
+ return fields;
+ }
+
+ public Map<String,String> getNormalizedFieldValues() {
+ Map<String,String> fields = new HashMap<String,String>();
+ fields.put("ID", nn.normalizeFieldValue("ID", this.id));
+ fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
+ fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
+ fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
+ return fields;
+ }
+
+ }
+
+ public ArticleExtractor() {}
+
+ public Article extract(Reader reader) {
+ XMLInputFactory xmlif = XMLInputFactory.newInstance();
+ xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
+
+ XMLStreamReader xmlr = null;
+
+ try {
+ xmlr = xmlif.createXMLStreamReader(reader);
+ } catch (XMLStreamException e1) {
+ throw new RuntimeException(e1);
+ }
+
+ QName titleName = QName.valueOf("title");
+ QName textName = QName.valueOf("text");
+ QName revisionName = QName.valueOf("revision");
+ QName timestampName = QName.valueOf("timestamp");
+ QName commentName = QName.valueOf("comment");
+ QName idName = QName.valueOf("id");
+
+ Map<QName,StringBuilder> tags = new HashMap<QName,StringBuilder>();
+ for (QName tag : new QName[] {titleName, textName, timestampName, commentName, idName}) {
+ tags.put(tag, new StringBuilder());
+ }
+
+ StringBuilder articleText = tags.get(textName);
+ StringBuilder titleText = tags.get(titleName);
+ StringBuilder timestampText = tags.get(timestampName);
+ StringBuilder commentText = tags.get(commentName);
+ StringBuilder idText = tags.get(idName);
+
+ StringBuilder current = null;
+ boolean inRevision = false;
+ while (true) {
+ try {
+ if (!xmlr.hasNext())
+ break;
+ xmlr.next();
+ } catch (XMLStreamException e) {
+ throw new RuntimeException(e);
+ }
+ QName currentName = null;
+ if (xmlr.hasName()) {
+ currentName = xmlr.getName();
+ }
+ if (xmlr.isStartElement() && tags.containsKey(currentName)) {
+ if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
+ current = tags.get(currentName);
+ current.setLength(0);
+ }
+ } else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
+ inRevision = true;
+ } else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
+ inRevision = false;
+ } else if (xmlr.isEndElement() && current != null) {
+ if (textName.equals(currentName)) {
+
+ String title = titleText.toString();
+ String text = articleText.toString();
+ String comment = commentText.toString();
+ int id = Integer.parseInt(idText.toString());
+ long timestamp;
+ try {
+ timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
+ return new Article(id, title, timestamp, comment, text);
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+ current = null;
+ } else if (current != null && xmlr.hasText()) {
+ current.append(xmlr.getText());
+ }
+ }
+ return null;
+ }
+}
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.SimpleAnalyzer;
+
+public class WikipediaConfiguration {
+ public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+ public final static String USER = "wikipedia.accumulo.user";
+ public final static String PASSWORD = "wikipedia.accumulo.password";
+ public final static String TABLE_NAME = "wikipedia.accumulo.table";
+
+ public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+
+ public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+ public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+ public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+
+ public final static String ANALYZER = "wikipedia.index.analyzer";
+
+ public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+ public static String getUser(Configuration conf) {
+ return conf.get(USER);
+ };
+
+ public static byte[] getPassword(Configuration conf) {
+ String pass = conf.get(PASSWORD);
+ if (pass == null) {
+ return null;
+ }
+ return pass.getBytes();
+ }
+
+ public static String getTableName(Configuration conf) {
+ String tablename = conf.get(TABLE_NAME);
+ if (tablename == null) {
+ throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+ }
+ return tablename;
+ }
+
+ public static String getInstanceName(Configuration conf) {
+ return conf.get(INSTANCE_NAME);
+ }
+
+ public static String getZookeepers(Configuration conf) {
+ String zookeepers = conf.get(ZOOKEEPERS);
+ if (zookeepers == null) {
+ throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+ }
+ return zookeepers;
+ }
+
+ public static Path getNamespacesFile(Configuration conf) {
+ String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+ return new Path(filename);
+ }
+
+ public static Path getLanguagesFile(Configuration conf) {
+ String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+ return new Path(filename);
+ }
+
+ public static Path getWorkingDirectory(Configuration conf) {
+ String filename = conf.get(WORKING_DIRECTORY);
+ return new Path(filename);
+ }
+
+ public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+ Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+ return ReflectionUtils.newInstance(analyzerClass, conf);
+ }
+
+ public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ return getInstance(conf).getConnector(getUser(conf), getPassword(conf));
+ }
+
+ public static Instance getInstance(Configuration conf) {
+ return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+ }
+
+ public static int getNumPartitions(Configuration conf) {
+ return conf.getInt(NUM_PARTITIONS, 25);
+ }
+
+ /**
+ * Helper method to get properties from Hadoop configuration
+ *
+ * @param <T>
+ * @param conf
+ * @param propertyName
+ * @param resultClass
+ * @throws IllegalArgumentException
+ * if property is not defined, null, or empty. Or if resultClass is not handled.
+ * @return value of property
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+ String p = conf.get(propertyName);
+ if (StringUtils.isEmpty(p))
+ throw new IllegalArgumentException(propertyName + " must be specified");
+
+ if (resultClass.equals(String.class))
+ return (T) p;
+ else if (resultClass.equals(String[].class))
+ return (T) conf.getStrings(propertyName);
+ else if (resultClass.equals(Boolean.class))
+ return (T) Boolean.valueOf(p);
+ else if (resultClass.equals(Long.class))
+ return (T) Long.valueOf(p);
+ else if (resultClass.equals(Integer.class))
+ return (T) Integer.valueOf(p);
+ else if (resultClass.equals(Float.class))
+ return (T) Float.valueOf(p);
+ else if (resultClass.equals(Double.class))
+ return (T) Double.valueOf(p);
+ else
+ throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+
+ }
+
+}
Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native