You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/01/12 17:06:20 UTC

svn commit: r1230608 [4/16] - in /incubator/accumulo/trunk: ./ contrib/accumulo_sample/ src/assemble/ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ src/core/src/main/java/org/apache/accumulo/core/master/thrift/ src/core/...

Added: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.shard;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * The program reads an accumulo table written by Index.java and writes out to another table. It writes out a mapping of documents to terms. The document to term
+ * mapping is used by ContinuousQuery.java
+ * 
+ */
+
+public class Reverse {
+  public static void main(String[] args) throws Exception {
+    
+    if (args.length != 6) {
+      System.err.println("Usage : " + Reverse.class.getName() + " <instance> <zoo keepers> <shard table> <doc2word table> <user> <pass>");
+      System.exit(-1);
+    }
+    
+    String instance = args[0];
+    String zooKeepers = args[1];
+    String inTable = args[2];
+    String outTable = args[3];
+    String user = args[4];
+    String pass = args[5];
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(instance, zooKeepers);
+    Connector conn = zki.getConnector(user, pass.getBytes());
+    
+    Scanner scanner = conn.createScanner(inTable, Constants.NO_AUTHS);
+    BatchWriter bw = conn.createBatchWriter(outTable, 50000000, 600000l, 4);
+    
+    for (Entry<Key,Value> entry : scanner) {
+      Key key = entry.getKey();
+      Mutation m = new Mutation(key.getColumnQualifier());
+      m.put(key.getColumnFamily(), new Text(), new Value(new byte[0]));
+      bw.addMutation(m);
+    }
+    
+    bw.close();
+    
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.dirlist;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.examples.simple.dirlist.FileCount;
+import org.apache.accumulo.examples.simple.dirlist.Ingest;
+import org.apache.accumulo.examples.simple.dirlist.QueryUtil;
+import org.apache.hadoop.io.Text;
+
+public class CountTest extends TestCase {
+  {
+    try {
+      Connector conn = new MockInstance("counttest").getConnector("root", "".getBytes());
+      conn.tableOperations().create("dirlisttable");
+      BatchWriter bw = conn.createBatchWriter("dirlisttable", 1000000l, 100l, 1);
+      ColumnVisibility cv = new ColumnVisibility();
+      // / has 1 dir
+      // /local has 2 dirs 1 file
+      // /local/user1 has 2 files
+      bw.addMutation(Ingest.buildMutation(cv, "/local", true, false, true, 272, 12345, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/user1", true, false, true, 272, 12345, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/user2", true, false, true, 272, 12345, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 12345, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 23456, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file1", false, false, false, 2024, 12345, null));
+      bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file2", false, false, false, 1028, 23456, null));
+      bw.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  public void test() throws Exception {
+    Scanner scanner = new MockInstance("counttest").getConnector("root", "".getBytes()).createScanner("dirlisttable", new Authorizations());
+    scanner.fetchColumn(new Text("dir"), new Text("counts"));
+    assertFalse(scanner.iterator().hasNext());
+    
+    FileCount fc = new FileCount("counttest", null, "root", "", "dirlisttable", "", "", true);
+    fc.run();
+    
+    ArrayList<Pair<String,String>> expected = new ArrayList<Pair<String,String>>();
+    expected.add(new Pair<String,String>(QueryUtil.getRow("").toString(), "1,0,3,3"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local").toString(), "2,1,2,3"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user1").toString(), "0,2,0,2"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user2").toString(), "0,0,0,0"));
+    
+    int i = 0;
+    for (Entry<Key,Value> e : scanner) {
+      assertEquals(e.getKey().getRow().toString(), expected.get(i).getFirst());
+      assertEquals(e.getValue().toString(), expected.get(i).getSecond());
+      i++;
+    }
+    assertEquals(i, expected.size());
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkCombiner;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+
+import junit.framework.TestCase;
+
+public class ChunkCombinerTest extends TestCase {
+  
+  public static class MapIterator implements SortedKeyValueIterator<Key,Value> {
+    private Iterator<Entry<Key,Value>> iter;
+    private Entry<Key,Value> entry;
+    Collection<ByteSequence> columnFamilies;
+    private SortedMap<Key,Value> map;
+    private Range range;
+    
+    public MapIterator deepCopy(IteratorEnvironment env) {
+      return new MapIterator(map);
+    }
+    
+    private MapIterator(SortedMap<Key,Value> map) {
+      this.map = map;
+      iter = map.entrySet().iterator();
+      this.range = new Range();
+      if (iter.hasNext())
+        entry = iter.next();
+      else
+        entry = null;
+    }
+    
+    @Override
+    public Key getTopKey() {
+      return entry.getKey();
+    }
+    
+    @Override
+    public Value getTopValue() {
+      return entry.getValue();
+    }
+    
+    @Override
+    public boolean hasTop() {
+      return entry != null;
+    }
+    
+    @Override
+    public void next() throws IOException {
+      entry = null;
+      while (iter.hasNext()) {
+        entry = iter.next();
+        if (columnFamilies.size() > 0 && !columnFamilies.contains(entry.getKey().getColumnFamilyData())) {
+          entry = null;
+          continue;
+        }
+        if (range.afterEndKey((Key) entry.getKey()))
+          entry = null;
+        break;
+      }
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      if (!inclusive) {
+        throw new IllegalArgumentException("can only do inclusive colf filtering");
+      }
+      this.columnFamilies = columnFamilies;
+      this.range = range;
+      
+      Key key = range.getStartKey();
+      if (key == null) {
+        key = new Key();
+      }
+      
+      iter = map.tailMap(key).entrySet().iterator();
+      next();
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
+    }
+    
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  private TreeMap<Key,Value> row1;
+  private TreeMap<Key,Value> row2;
+  private TreeMap<Key,Value> row3;
+  private TreeMap<Key,Value> allRows;
+  
+  private TreeMap<Key,Value> cRow1;
+  private TreeMap<Key,Value> cRow2;
+  private TreeMap<Key,Value> cRow3;
+  private TreeMap<Key,Value> allCRows;
+  
+  private TreeMap<Key,Value> cOnlyRow1;
+  private TreeMap<Key,Value> cOnlyRow2;
+  private TreeMap<Key,Value> cOnlyRow3;
+  private TreeMap<Key,Value> allCOnlyRows;
+  
+  private TreeMap<Key,Value> badrow;
+  
+  @Override
+  protected void setUp() {
+    row1 = new TreeMap<Key,Value>();
+    row2 = new TreeMap<Key,Value>();
+    row3 = new TreeMap<Key,Value>();
+    allRows = new TreeMap<Key,Value>();
+    
+    cRow1 = new TreeMap<Key,Value>();
+    cRow2 = new TreeMap<Key,Value>();
+    cRow3 = new TreeMap<Key,Value>();
+    allCRows = new TreeMap<Key,Value>();
+    
+    cOnlyRow1 = new TreeMap<Key,Value>();
+    cOnlyRow2 = new TreeMap<Key,Value>();
+    cOnlyRow3 = new TreeMap<Key,Value>();
+    allCOnlyRows = new TreeMap<Key,Value>();
+    
+    badrow = new TreeMap<Key,Value>();
+    
+    String refs = FileDataIngest.REFS_CF.toString();
+    String fileext = FileDataIngest.REFS_FILE_EXT;
+    String filename = FileDataIngest.REFS_ORIG_FILE;
+    String chunk_cf = FileDataIngest.CHUNK_CF.toString();
+    
+    row1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+    row1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V1".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0001", "A"), new Value("V2".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0001", "B"), new Value("V2".getBytes()));
+    
+    cRow1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+    cRow1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+    cRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+    cRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+    
+    cOnlyRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+    cOnlyRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+    
+    row2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+    row2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "A|B"), new Value("V1".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+    row2.put(new Key("row2a", chunk_cf, "0000", "C"), new Value("V1".getBytes()));
+    
+    cRow2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+    cRow2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+    cRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+    
+    cOnlyRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+    
+    row3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(A&B)|(C&(D|E))", 10), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "A&B", 20), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(A&B)", 10), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(F|G)&(D|E)", 10), new Value("V1".getBytes()));
+    
+    cRow3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+    
+    cOnlyRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+    
+    badrow.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    badrow.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V2".getBytes()));
+    
+    allRows.putAll(row1);
+    allRows.putAll(row2);
+    allRows.putAll(row3);
+    
+    allCRows.putAll(cRow1);
+    allCRows.putAll(cRow2);
+    allCRows.putAll(cRow3);
+    
+    allCOnlyRows.putAll(cOnlyRow1);
+    allCOnlyRows.putAll(cOnlyRow2);
+    allCOnlyRows.putAll(cOnlyRow3);
+  }
+  
+  private static final Collection<ByteSequence> emptyColfs = new HashSet<ByteSequence>();
+  
+  public void test1() throws IOException {
+    runTest(false, allRows, allCRows, emptyColfs);
+    runTest(true, allRows, allCRows, emptyColfs);
+    runTest(false, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+    runTest(true, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+    
+    try {
+      runTest(true, badrow, null, emptyColfs);
+      assertNotNull(null);
+    } catch (RuntimeException e) {
+      assertNull(null);
+    }
+  }
+  
+  private void runTest(boolean reseek, TreeMap<Key,Value> source, TreeMap<Key,Value> result, Collection<ByteSequence> cols) throws IOException {
+    MapIterator src = new MapIterator(source);
+    SortedKeyValueIterator<Key,Value> iter = new ChunkCombiner();
+    iter.init(src, null, null);
+    iter = iter.deepCopy(null);
+    iter.seek(new Range(), cols, true);
+    
+    TreeMap<Key,Value> seen = new TreeMap<Key,Value>();
+    
+    while (iter.hasTop()) {
+      assertFalse("already contains " + iter.getTopKey(), seen.containsKey(iter.getTopKey()));
+      seen.put(new Key(iter.getTopKey()), new Value(iter.getTopValue()));
+      
+      if (reseek)
+        iter.seek(new Range(iter.getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL), true, null, true), cols, true);
+      else
+        iter.next();
+    }
+    
+    assertEquals(result, seen);
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkCombinerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputFormat;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.log4j.Logger;
+
+public class ChunkInputFormatTest extends TestCase {
+  private static final Logger log = Logger.getLogger(ChunkInputStream.class);
+  List<Entry<Key,Value>> data;
+  List<Entry<Key,Value>> baddata;
+  
+  {
+    data = new ArrayList<Entry<Key,Value>>();
+    ChunkInputStreamTest.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamTest.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamTest.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    ChunkInputStreamTest.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamTest.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamTest.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    ChunkInputStreamTest.addData(data, "b", "~chunk", 100, 1, "D", "");
+    baddata = new ArrayList<Entry<Key,Value>>();
+    ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+  }
+  
+  public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+    assertEquals(e1.getKey(), e2.getKey());
+    assertEquals(e1.getValue(), e2.getValue());
+  }
+  
+  public void test() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+    MockInstance instance = new MockInstance("instance1");
+    Connector conn = instance.getConnector("root", "".getBytes());
+    conn.tableOperations().create("test");
+    BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+    
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    JobContext job = new JobContext(new Configuration(), new JobID());
+    ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+    ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1");
+    ChunkInputFormat cif = new ChunkInputFormat();
+    RangeInputSplit ris = new RangeInputSplit();
+    TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+    RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac);
+    rr.initialize(ris, tac);
+    
+    assertTrue(rr.nextKeyValue());
+    List<Entry<Key,Value>> info = rr.getCurrentKey();
+    InputStream cis = rr.getCurrentValue();
+    byte[] b = new byte[20];
+    int read;
+    assertEquals(info.size(), 2);
+    entryEquals(info.get(0), data.get(0));
+    entryEquals(info.get(1), data.get(1));
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    
+    assertTrue(rr.nextKeyValue());
+    info = rr.getCurrentKey();
+    cis = rr.getCurrentValue();
+    assertEquals(info.size(), 2);
+    entryEquals(info.get(0), data.get(4));
+    entryEquals(info.get(1), data.get(5));
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    
+    assertFalse(rr.nextKeyValue());
+  }
+  
+  public void testErrorOnNextWithoutClose() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
+      TableExistsException {
+    MockInstance instance = new MockInstance("instance2");
+    Connector conn = instance.getConnector("root", "".getBytes());
+    conn.tableOperations().create("test");
+    BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+    
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    JobContext job = new JobContext(new Configuration(), new JobID());
+    ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+    ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2");
+    ChunkInputFormat cif = new ChunkInputFormat();
+    RangeInputSplit ris = new RangeInputSplit();
+    TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+    RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
+    crr.initialize(ris, tac);
+    
+    assertTrue(crr.nextKeyValue());
+    InputStream cis = crr.getCurrentValue();
+    byte[] b = new byte[5];
+    int read;
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    
+    try {
+      crr.nextKeyValue();
+      assertNotNull(null);
+    } catch (Exception e) {
+      log.debug("EXCEPTION " + e.getMessage());
+      assertNull(null);
+    }
+  }
+  
+  public void testInfoWithoutChunks() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
+      TableExistsException {
+    MockInstance instance = new MockInstance("instance3");
+    Connector conn = instance.getConnector("root", "".getBytes());
+    conn.tableOperations().create("test");
+    BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+    for (Entry<Key,Value> e : baddata) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    JobContext job = new JobContext(new Configuration(), new JobID());
+    ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
+    ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3");
+    ChunkInputFormat cif = new ChunkInputFormat();
+    RangeInputSplit ris = new RangeInputSplit();
+    TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+    RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
+    crr.initialize(ris, tac);
+    
+    assertTrue(crr.nextKeyValue());
+    List<Entry<Key,Value>> info = crr.getCurrentKey();
+    InputStream cis = crr.getCurrentValue();
+    byte[] b = new byte[20];
+    assertEquals(info.size(), 2);
+    entryEquals(info.get(0), baddata.get(0));
+    entryEquals(info.get(1), baddata.get(1));
+    try {
+      cis.read(b);
+      assertNotNull(null);
+    } catch (Exception e) {
+      log.debug("EXCEPTION " + e.getMessage());
+      assertNull(null);
+    }
+    try {
+      cis.close();
+      assertNotNull(null);
+    } catch (Exception e) {
+      log.debug("EXCEPTION " + e.getMessage());
+      assertNull(null);
+    }
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class ChunkInputStreamTest extends TestCase {
+  private static final Logger log = Logger.getLogger(ChunkInputStream.class);
+  List<Entry<Key,Value>> data;
+  List<Entry<Key,Value>> baddata;
+  List<Entry<Key,Value>> multidata;
+  
+  {
+    data = new ArrayList<Entry<Key,Value>>();
+    addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "a", "refs", "id\0name", "A&B", "name");
+    addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "b", "refs", "id\0name", "A&B", "name");
+    addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    addData(data, "b", "~chunk", 100, 1, "D", "");
+    addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 2, "A&B", "");
+    baddata = new ArrayList<Entry<Key,Value>>();
+    addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 2, "C", "");
+    addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+    addData(baddata, "c", "~chunk", 100, 2, "E", "");
+    addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+    addData(baddata, "d", "~chunk", 100, 1, "G", "");
+    addData(baddata, "d", "~zzzzz", "colq", "H", "");
+    addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+    addData(baddata, "e", "~chunk", 100, 1, "J", "");
+    addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+    multidata = new ArrayList<Entry<Key,Value>>();
+    addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+    addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+  }
+  
+  public static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+  }
+  
+  public static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+    Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+    chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+  }
+  
+  public void testExceptionOnMultipleSetSourceWithoutClose() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    cis.setSource(pi);
+    try {
+      cis.setSource(pi);
+      assertNotNull(null);
+    } catch (RuntimeException e) {
+      assertNull(null);
+    }
+    cis.close();
+  }
+  
+  public void testExceptionOnGetVisBeforeClose() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    
+    cis.setSource(pi);
+    try {
+      cis.getVisibilities();
+      assertNotNull(null);
+    } catch (RuntimeException e) {
+      assertNull(null);
+    }
+    cis.close();
+    cis.getVisibilities();
+  }
+  
+  public void testReadIntoBufferSmallerThanChunks() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[5];
+    
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    
+    cis.setSource(pi);
+    int read;
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    assertEquals(read = cis.read(b), 3);
+    assertEquals(new String(b, 0, read), "kl;");
+    assertEquals(read = cis.read(b), -1);
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "qwert");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "yuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "kl;as");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "dfjkl");
+    assertEquals(read = cis.read(b), 1);
+    assertEquals(new String(b, 0, read), ";");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    
+    assertFalse(pi.hasNext());
+  }
+  
+  public void testReadIntoBufferLargerThanChunks() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 16);
+    assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    
+    assertFalse(pi.hasNext());
+  }
+  
+  public void testWithAccumulo() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException {
+    Connector conn = new MockInstance().getConnector("root", "".getBytes());
+    conn.tableOperations().create("test");
+    BatchWriter bw = conn.createBatchWriter("test", 100000l, 100l, 5);
+    
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    Scanner scan = conn.createScanner("test", new Authorizations("A", "B", "C", "D"));
+    
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(scan.iterator());
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 16);
+    assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    
+    assertFalse(pi.hasNext());
+  }
+  
+  private static void assumeExceptionOnRead(ChunkInputStream cis, byte[] b) {
+    try {
+      cis.read(b);
+      assertNotNull(null);
+    } catch (IOException e) {
+      log.debug("EXCEPTION " + e.getMessage());
+      assertNull(null);
+    }
+  }
+  
+  private static void assumeExceptionOnClose(ChunkInputStream cis) {
+    try {
+      cis.close();
+      assertNotNull(null);
+    } catch (IOException e) {
+      log.debug("EXCEPTION " + e.getMessage());
+      assertNull(null);
+    }
+  }
+  
+  public void testBadData() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    // can still get visibilities after exception -- bad?
+    assertEquals(cis.getVisibilities().toString(), "[A]");
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    assertEquals(cis.getVisibilities().toString(), "[B, C]");
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    assertEquals(cis.getVisibilities().toString(), "[D, E]");
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[F, G]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[I, J]");
+    
+    assertFalse(pi.hasNext());
+    
+    pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+    cis.setSource(pi);
+    assumeExceptionOnClose(cis);
+  }
+  
+  public void testBadDataWithoutClosing() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    // can still get visibilities after exception -- bad?
+    assertEquals(cis.getVisibilities().toString(), "[A]");
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[B, C]");
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[D, E]");
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[F, G]");
+    cis.close();
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[I, J]");
+    
+    assertFalse(pi.hasNext());
+    
+    pi = new PeekingIterator<Entry<Key,Value>>(baddata.iterator());
+    cis.setSource(pi);
+    assumeExceptionOnClose(cis);
+  }
+  
+  public void testMultipleChunkSizes() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(multidata.iterator());
+    
+    b = new byte[20];
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C]");
+    
+    assertFalse(pi.hasNext());
+  }
+  
+  public void testSingleByteRead() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(data.iterator());
+    
+    cis.setSource(pi);
+    assertEquals((byte) 'a', (byte) cis.read());
+    assertEquals((byte) 's', (byte) cis.read());
+    assertEquals((byte) 'd', (byte) cis.read());
+    assertEquals((byte) 'f', (byte) cis.read());
+    assertEquals((byte) 'j', (byte) cis.read());
+    assertEquals((byte) 'k', (byte) cis.read());
+    assertEquals((byte) 'l', (byte) cis.read());
+    assertEquals((byte) ';', (byte) cis.read());
+    assertEquals(cis.read(), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java (added)
+++ incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.filedata;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.examples.simple.filedata.KeyUtil;
+import org.apache.hadoop.io.Text;
+
+public class KeyUtilTest extends TestCase {
+  public static void checkSeps(String... s) {
+    Text t = KeyUtil.buildNullSepText(s);
+    String[] rets = KeyUtil.splitNullSepText(t);
+    
+    int length = 0;
+    for (String str : s)
+      length += str.length();
+    assertEquals(t.getLength(), length + s.length - 1);
+    assertEquals(rets.length, s.length);
+    for (int i = 0; i < s.length; i++)
+      assertEquals(s[i], rets[i]);
+  }
+  
+  public void testNullSep() {
+    checkSeps("abc", "d", "", "efgh");
+    checkSeps("ab", "");
+    checkSeps("abcde");
+    checkSeps("");
+    checkSeps("", "");
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/KeyUtilTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Jan 12 16:06:14 2012
@@ -0,0 +1,2 @@
+.*
+target

Added: incubator/accumulo/trunk/src/examples/wikisearch/README
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/README?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/README (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/README Thu Jan 12 16:06:14 2012
@@ -0,0 +1,65 @@
+
+ This project contains a sample application for ingesting and querying wikipedia data.
+ 
+  
+ Ingest
+ ------
+ 
+ 	Prerequisites
+ 	-------------
+ 	1. Accumulo, Hadoop, and ZooKeeper must be installed and running
+ 	2. ACCUMULO_HOME and ZOOKEEPER_HOME must be defined in the environment
+ 	3. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
+	   You will want to grab the files with the link name of pages-articles.xml.bz2
+ 
+ 
+ 	INSTRUCTIONS
+ 	------------
+	1. Copy the ingest/conf/wikipedia.xml.example to ingest/conf/wikipedia.xml and change it to specify Accumulo information. 
+	2. Copy the ingest/lib/wikisearch-*.jar and ingest/lib/protobuf*.jar to $ACCUMULO_HOME/lib/ext
+	3. Then run ingest/bin/ingest.sh with one argument (the name of the directory in HDFS where the wikipedia XML 
+           files reside) and this will kick off a MapReduce job to ingest the data into Accumulo.
+   
+ Query
+ -----
+ 
+ 	Prerequisites
+ 	-------------
+	1. The query software was tested using JBoss AS 6. Install this unless you feel like messing with the installation.
+ 	
+	NOTE: Ran into a bug (https://issues.jboss.org/browse/RESTEASY-531) that did not allow an EJB3.1 war file. The
+	workaround is to separate the RESTEasy servlet from the EJBs by creating an EJB jar and a WAR file.
+	
+	INSTRUCTIONS
+	-------------
+	1. Copy the query/src/main/resources/META-INF/ejb-jar.xml.example file to 
+	   query/src/main/resources/META-INF/ejb-jar.xml. Modify to the file to contain the same 
+	   information that you put into the wikipedia.xml file from the Ingest step above. 
+	2. Re-build the query distribution by running 'mvn package assembly:single' in the top-level directory. 
+        3. Untar the resulting file in the $JBOSS_HOME/server/default directory.
+
+              $ cd $JBOSS_HOME/server/default
+              $ tar -xzf $ACCUMULO_HOME/src/examples/wikisearch/query/target/wikisearch-query*.tar.gz
+ 
+           This will place the dependent jars in the lib directory and the EJB jar into the deploy directory.
+	4. Next, copy the wikisearch*.war file in the query-war/target directory to $JBOSS_HOME/server/default/deploy. 
+	5. Start JBoss ($JBOSS_HOME/bin/run.sh)
+	6. Use the Accumulo shell and give the user permissions for the wikis that you loaded, for example: 
+			setauths -u <user> -s all,enwiki,eswiki,frwiki,fawiki
+	7. Copy the following jars to the $ACCUMULO_HOME/lib/ext directory from the $JBOSS_HOME/server/default/lib directory:
+	
+		commons-lang*.jar
+		kryo*.jar
+		minlog*.jar
+		commons-jexl*.jar
+		google-collections*.jar
+		
+	8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to $ACCUMULO_HOME/lib/ext.
+
+
+	9. At this point you should be able to open a browser and view the page: http://localhost:8080/accumulo-wikisearch/ui/ui.jsp.
+	You can issue the queries using this user interface or via the following REST urls: <host>/accumulo-wikisearch/rest/Query/xml,
+	<host>/accumulo-wikisearch/rest/Query/html, <host>/accumulo-wikisearch/rest/Query/yaml, or <host>/accumulo-wikisearch/rest/Query/json.
+	There are two parameters to the REST service, query and auths. The query parameter is the same string that you would type
+	into the search box at ui.jsp, and the auths parameter is a comma-separated list of wikis that you want to search (i.e.
+	enwiki,frwiki,dewiki, etc. Or you can use all) 

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/README
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Jan 12 16:06:14 2012
@@ -0,0 +1,3 @@
+.*
+target
+lib

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh Thu Jan 12 16:06:14 2012
@@ -0,0 +1,74 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+THIS_SCRIPT="$0"
+SCRIPT_DIR="${THIS_SCRIPT%/*}"
+SCRIPT_DIR=`cd $SCRIPT_DIR ; pwd`
+echo $SCRIPT_DIR
+
+ACCUMULO_HOME=${ACCUMULO_HOME}
+ZOOKEEPER_HOME=${ZOOKEEPER_HOME}
+
+#
+# Check ZOOKEEPER_HOME
+#
+if [[ -z $ZOOKEEPER_HOME ]]; then
+	echo "You must set ZOOKEEPER_HOME environment variable"
+	exit -1;
+else
+	for f in $ZOOKEEPER_HOME/zookeeper-*.jar; do
+		CLASSPATH=$f
+		break
+	done	
+fi
+
+#
+# Check ACCUMULO_HOME
+#
+if [[ -z $ACCUMULO_HOME ]]; then
+	echo "You must set ACCUMULO_HOME environment variable"
+	exit -1;
+else
+	for f in $ACCUMULO_HOME/lib/*.jar; do
+		CLASSPATH=${CLASSPATH}:$f
+	done	
+fi
+
+#
+# Add our jars
+#
+for f in $SCRIPT_DIR/../lib/*.jar; do
+	CLASSPATH=${CLASSPATH}:$f  
+done
+
+#
+# Transform the classpath into a comma-separated list also
+#
+LIBJARS=`echo $CLASSPATH | sed 's/:/,/g'`
+
+
+#
+# Map/Reduce job
+#
+JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.4.0-incubating-SNAPSHOT.jar
+CONF=$SCRIPT_DIR/../conf/wikipedia.xml
+HDFS_DATA_DIR=$1
+export HADOOP_CLASSPATH=$CLASSPATH
+echo "hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}"
+hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/bin/ingest.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia.xml.example Thu Jan 12 16:06:14 2012
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+  <property>
+    <name>wikipedia.accumulo.zookeepers</name>
+    <value><!--zookeeper servers --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.instance_name</name>
+    <value><!--instance name --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.user</name>
+    <value><!--user name --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.password</name>
+    <value><!-- password --></value>
+  </property>
+  <property>
+    <name>wikipedia.accumulo.table</name>
+    <value><!--table name --></value>
+  </property>
+  <property>
+    <name>wikipedia.ingest.partitions</name>
+    <value><!--number of partitions --></value>
+  </property>
+</configuration>

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml Thu Jan 12 16:06:14 2012
@@ -0,0 +1,114 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements. See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>accumulo-wikisearch</artifactId>
+    <groupId>org.apache.accumulo</groupId>
+    <version>1.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../</relativePath>
+  </parent>
+
+  <artifactId>wikisearch-ingest</artifactId>
+  <name>wikisearch-ingest</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-start</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.collections</groupId>
+      <artifactId>google-collections</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-wikipedia</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+    	<groupId>com.sun.jersey</groupId>
+    	<artifactId>jersey-server</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>lib</outputDirectory>
+              <!-- just grab the non-provided runtime dependencies -->
+              <includeArtifactIds>commons-lang,google-collections,lucene-core,lucene-analyzers,lucene-wikipedia,protobuf-java</includeArtifactIds>
+              <excludeTransitive>true</excludeTransitive>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/assembly/dist.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml Thu Jan 12 16:06:14 2012
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+  <id>dist</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <baseDirectory></baseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>lib</directory>
+      <fileMode>0644</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>bin</directory>
+      <fileMode>0744</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>conf</directory>
+      <fileMode>0644</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/assembly/dist.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.aggregator;
+
+import java.util.HashSet;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
+import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the global index and global reverse index, where
+ * the list of UIDs for events will be maintained in the index for low cardinality terms (Low in this case being less than 20).
+ * 
+ */
+public class GlobalIndexUidAggregator implements Aggregator {
+  
+  private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class);
+  private Uid.List.Builder builder = Uid.List.newBuilder();
+  // Using a set instead of a list so that duplicate IDs are filtered out of the list.
+  private HashSet<String> uids = new HashSet<String>();
+  private boolean seenIgnore = false;
+  public static final int MAX = 20;
+  private long count = 0;
+  
+  @Override
+  public Value aggregate() {
+    // Special case logic
+    // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true
+    // However, always maintain the count
+    if (uids.size() > MAX || seenIgnore) {
+      builder.setCOUNT(count);
+      builder.setIGNORE(true);
+      builder.clearUID();
+    } else {
+      builder.setCOUNT(count);
+      builder.setIGNORE(false);
+      builder.addAllUID(uids);
+    }
+    return new Value(builder.build().toByteArray());
+  }
+  
+  @Override
+  public void collect(Value value) {
+    if (null == value || value.get().length == 0)
+      return;
+    // Collect the values, which are serialized Uid.List objects
+    try {
+      Uid.List v = Uid.List.parseFrom(value.get());
+      count = count + v.getCOUNT();
+      if (v.getIGNORE()) {
+        seenIgnore = true;
+      }
+      // Add the incoming list to this list
+      uids.addAll(v.getUIDList());
+    } catch (InvalidProtocolBufferException e) {
+      log.error("Value passed to aggregator was not of type Uid.List", e);
+    }
+  }
+  
+  @Override
+  public void reset() {
+    count = 0;
+    seenIgnore = false;
+    builder = Uid.List.newBuilder();
+    uids.clear();
+  }
+  
+}

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/GlobalIndexUidAggregator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.aggregator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.aggregation.Aggregator;
+import org.apache.accumulo.examples.wikisearch.protobuf.TermWeight;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * An Aggregator to merge together a list of term offsets and one normalized term frequency
+ * 
+ */
+public class TextIndexAggregator implements Aggregator {
+  private static final Logger log = Logger.getLogger(TextIndexAggregator.class);
+  
+  private List<Integer> offsets = new ArrayList<Integer>();
+  private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder();
+  private float normalizedTermFrequency = 0f;
+  
+  @Override
+  public Value aggregate() {
+    // Keep the sorted order we tried to maintain
+    for (int i = 0; i < offsets.size(); ++i) {
+      builder.addWordOffset(offsets.get(i));
+    }
+    
+    builder.setNormalizedTermFrequency(normalizedTermFrequency);
+    
+    return new Value(builder.build().toByteArray());
+  }
+  
+  @Override
+  public void collect(Value value) {
+    // Make sure we don't aggregate something else
+    if (value == null || value.get().length == 0) {
+      return;
+    }
+    
+    TermWeight.Info info;
+    
+    try {
+      info = TermWeight.Info.parseFrom(value.get());
+    } catch (InvalidProtocolBufferException e) {
+      log.error("Value passed to aggregator was not of type TermWeight.Info", e);
+      return;
+    }
+    
+    // Add each offset into the list maintaining sorted order
+    for (int offset : info.getWordOffsetList()) {
+      int pos = Collections.binarySearch(offsets, offset);
+      
+      if (pos < 0) {
+        // Undo the transform on the insertion point
+        offsets.add((-1 * pos) - 1, offset);
+      } else {
+        offsets.add(pos, offset);
+      }
+    }
+    
+    if (info.getNormalizedTermFrequency() > 0) {
+      this.normalizedTermFrequency += info.getNormalizedTermFrequency();
+    }
+  }
+  
+  @Override
+  public void reset() {
+    this.offsets.clear();
+    this.normalizedTermFrequency = 0f;
+    this.builder = TermWeight.Info.newBuilder();
+  }
+  
+}

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/aggregator/TextIndexAggregator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.io.Reader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
+import org.apache.accumulo.examples.wikisearch.normalizer.NumberNormalizer;
+
+
+public class ArticleExtractor {
+  
+  public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z");
+  private static NumberNormalizer nn = new NumberNormalizer();
+  private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer();
+  
+  public static class Article {
+    int id;
+    String title;
+    long timestamp;
+    String comments;
+    String text;
+    
+    private Article(int id, String title, long timestamp, String comments, String text) {
+      super();
+      this.id = id;
+      this.title = title;
+      this.timestamp = timestamp;
+      this.comments = comments;
+      this.text = text;
+    }
+    
+    public int getId() {
+      return id;
+    }
+    
+    public String getTitle() {
+      return title;
+    }
+    
+    public String getComments() {
+      return comments;
+    }
+    
+    public String getText() {
+      return text;
+    }
+    
+    public long getTimestamp() {
+      return timestamp;
+    }
+    
+    public Map<String,Object> getFieldValues() {
+      Map<String,Object> fields = new HashMap<String,Object>();
+      fields.put("ID", this.id);
+      fields.put("TITLE", this.title);
+      fields.put("TIMESTAMP", this.timestamp);
+      fields.put("COMMENTS", this.comments);
+      return fields;
+    }
+    
+    public Map<String,String> getNormalizedFieldValues() {
+      Map<String,String> fields = new HashMap<String,String>();
+      fields.put("ID", nn.normalizeFieldValue("ID", this.id));
+      fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title));
+      fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp));
+      fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments));
+      return fields;
+    }
+    
+  }
+  
+  public ArticleExtractor() {}
+  
+  public Article extract(Reader reader) {
+    XMLInputFactory xmlif = XMLInputFactory.newInstance();
+    xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE);
+    
+    XMLStreamReader xmlr = null;
+    
+    try {
+      xmlr = xmlif.createXMLStreamReader(reader);
+    } catch (XMLStreamException e1) {
+      throw new RuntimeException(e1);
+    }
+    
+    QName titleName = QName.valueOf("title");
+    QName textName = QName.valueOf("text");
+    QName revisionName = QName.valueOf("revision");
+    QName timestampName = QName.valueOf("timestamp");
+    QName commentName = QName.valueOf("comment");
+    QName idName = QName.valueOf("id");
+    
+    Map<QName,StringBuilder> tags = new HashMap<QName,StringBuilder>();
+    for (QName tag : new QName[] {titleName, textName, timestampName, commentName, idName}) {
+      tags.put(tag, new StringBuilder());
+    }
+    
+    StringBuilder articleText = tags.get(textName);
+    StringBuilder titleText = tags.get(titleName);
+    StringBuilder timestampText = tags.get(timestampName);
+    StringBuilder commentText = tags.get(commentName);
+    StringBuilder idText = tags.get(idName);
+    
+    StringBuilder current = null;
+    boolean inRevision = false;
+    while (true) {
+      try {
+        if (!xmlr.hasNext())
+          break;
+        xmlr.next();
+      } catch (XMLStreamException e) {
+        throw new RuntimeException(e);
+      }
+      QName currentName = null;
+      if (xmlr.hasName()) {
+        currentName = xmlr.getName();
+      }
+      if (xmlr.isStartElement() && tags.containsKey(currentName)) {
+        if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) {
+          current = tags.get(currentName);
+          current.setLength(0);
+        }
+      } else if (xmlr.isStartElement() && currentName.equals(revisionName)) {
+        inRevision = true;
+      } else if (xmlr.isEndElement() && currentName.equals(revisionName)) {
+        inRevision = false;
+      } else if (xmlr.isEndElement() && current != null) {
+        if (textName.equals(currentName)) {
+          
+          String title = titleText.toString();
+          String text = articleText.toString();
+          String comment = commentText.toString();
+          int id = Integer.parseInt(idText.toString());
+          long timestamp;
+          try {
+            timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime();
+            return new Article(id, title, timestamp, comment, text);
+          } catch (ParseException e) {
+            return null;
+          }
+        }
+        current = null;
+      } else if (current != null && xmlr.hasText()) {
+        current.append(xmlr.getText());
+      }
+    }
+    return null;
+  }
+}

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ArticleExtractor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1230608&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (added)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Thu Jan 12 16:06:14 2012
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.ingest;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.SimpleAnalyzer;
+
+public class WikipediaConfiguration {
+  public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name";
+  public final static String USER = "wikipedia.accumulo.user";
+  public final static String PASSWORD = "wikipedia.accumulo.password";
+  public final static String TABLE_NAME = "wikipedia.accumulo.table";
+  
+  public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers";
+  
+  public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename";
+  public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename";
+  public final static String WORKING_DIRECTORY = "wikipedia.ingest.working";
+  
+  public final static String ANALYZER = "wikipedia.index.analyzer";
+  
+  public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+  
+  public static String getUser(Configuration conf) {
+    return conf.get(USER);
+  };
+  
+  public static byte[] getPassword(Configuration conf) {
+    String pass = conf.get(PASSWORD);
+    if (pass == null) {
+      return null;
+    }
+    return pass.getBytes();
+  }
+  
+  public static String getTableName(Configuration conf) {
+    String tablename = conf.get(TABLE_NAME);
+    if (tablename == null) {
+      throw new RuntimeException("No data table name specified in " + TABLE_NAME);
+    }
+    return tablename;
+  }
+  
+  public static String getInstanceName(Configuration conf) {
+    return conf.get(INSTANCE_NAME);
+  }
+  
+  public static String getZookeepers(Configuration conf) {
+    String zookeepers = conf.get(ZOOKEEPERS);
+    if (zookeepers == null) {
+      throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS);
+    }
+    return zookeepers;
+  }
+  
+  public static Path getNamespacesFile(Configuration conf) {
+    String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getLanguagesFile(Configuration conf) {
+    String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString());
+    return new Path(filename);
+  }
+  
+  public static Path getWorkingDirectory(Configuration conf) {
+    String filename = conf.get(WORKING_DIRECTORY);
+    return new Path(filename);
+  }
+  
+  public static Analyzer getAnalyzer(Configuration conf) throws IOException {
+    Class<? extends Analyzer> analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class);
+    return ReflectionUtils.newInstance(analyzerClass, conf);
+  }
+  
+  public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
+    return getInstance(conf).getConnector(getUser(conf), getPassword(conf));
+  }
+  
+  public static Instance getInstance(Configuration conf) {
+    return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf));
+  }
+  
+  public static int getNumPartitions(Configuration conf) {
+    return conf.getInt(NUM_PARTITIONS, 25);
+  }
+  
+  /**
+   * Helper method to get properties from Hadoop configuration
+   * 
+   * @param <T>
+   * @param conf
+   * @param propertyName
+   * @param resultClass
+   * @throws IllegalArgumentException
+   *           if property is not defined, null, or empty. Or if resultClass is not handled.
+   * @return value of property
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T isNull(Configuration conf, String propertyName, Class<T> resultClass) {
+    String p = conf.get(propertyName);
+    if (StringUtils.isEmpty(p))
+      throw new IllegalArgumentException(propertyName + " must be specified");
+    
+    if (resultClass.equals(String.class))
+      return (T) p;
+    else if (resultClass.equals(String[].class))
+      return (T) conf.getStrings(propertyName);
+    else if (resultClass.equals(Boolean.class))
+      return (T) Boolean.valueOf(p);
+    else if (resultClass.equals(Long.class))
+      return (T) Long.valueOf(p);
+    else if (resultClass.equals(Integer.class))
+      return (T) Integer.valueOf(p);
+    else if (resultClass.equals(Float.class))
+      return (T) Float.valueOf(p);
+    else if (resultClass.equals(Double.class))
+      return (T) Double.valueOf(p);
+    else
+      throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
+    
+  }
+  
+}

Propchange: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native