You are viewing a plain text version of this content. The canonical link for it is here.
Posted to blur-commits@incubator.apache.org by am...@apache.org on 2016/08/30 01:57:49 UTC

[04/13] git commit: Third round of updates.

Third round of updates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ea50630a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ea50630a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ea50630a

Branch: refs/heads/master
Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a
Parents: 0141656
Author: Aaron McCurry <am...@gmail.com>
Authored: Sat May 7 13:11:54 2016 -0400
Committer: Aaron McCurry <am...@gmail.com>
Committed: Sat May 7 13:11:54 2016 -0400

----------------------------------------------------------------------
 blur-indexer/pom.xml                            |  58 +++
 blur-indexer/src/main/assemble/bin.xml          |  45 ++
 .../mapreduce/lib/update/BlurIndexCounter.java  |  17 +
 .../mapreduce/lib/update/ClusterDriver.java     | 362 ++++++++++++++
 .../blur/mapreduce/lib/update/FasterDriver.java | 486 +++++++++++++++++++
 .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++
 .../lib/update/InputSplitPruneUtil.java         | 133 +++++
 .../lib/update/LookupBuilderMapper.java         |  18 +
 .../lib/update/LookupBuilderReducer.java        | 165 +++++++
 .../lib/update/MapperForExistingDataMod.java    |  46 ++
 .../MapperForExistingDataWithIndexLookup.java   | 228 +++++++++
 .../lib/update/MapperForNewDataMod.java         |  82 ++++
 .../lib/update/MergeSortRowIdMatcher.java       | 372 ++++++++++++++
 .../lib/update/PrunedBlurInputFormat.java       |  57 +++
 .../update/PrunedSequenceFileInputFormat.java   |  59 +++
 .../src/main/resources/blur-site.properties     |   1 +
 .../src/main/resources/program-log4j.xml        |  29 ++
 blur-indexer/src/main/resources/test-log4j.xml  |  46 ++
 18 files changed, 2319 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/pom.xml
----------------------------------------------------------------------
diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
new file mode 100644
index 0000000..c7c1753
--- /dev/null
+++ b/blur-indexer/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-indexer</artifactId>
+	<version>0.2.8</version>
+	<name>blur-indexer</name>
+	<packaging>jar</packaging>
+
+	<properties>
+		<blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-SNAPSHOT</blur.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred</artifactId>
+			<version>${blur.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.9</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptor>src/main/assemble/bin.xml</descriptor>
+					<finalName>blur-indexer-${project.version}</finalName>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/assemble/bin.xml b/blur-indexer/src/main/assemble/bin.xml
new file mode 100644
index 0000000..5fddd56
--- /dev/null
+++ b/blur-indexer/src/main/assemble/bin.xml
@@ -0,0 +1,45 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>blur-indexer-${project.version}/lib</outputDirectory>
+      <unpack>false</unpack>
+      <includes>
+        <include>org.apache.blur:blur-indexer</include>
+        <include>org.apache.blur:*</include>
+        <include>org.apache.zookeeper:zookeeper</include>
+        <include>org.slf4j:slf4j-api</include>
+        <include>org.slf4j:slf4j-log4j12</include>
+        <include>org.json:json</include>
+        <include>log4j:log4j</include>
+        <include>com.yammer.metrics:*</include>
+        <include>com.google.guava:guava</include>
+        <include>org.apache.httpcomponents:*</include>
+        <include>org.apache.lucene:*</include>
+        <include>com.spatial4j:spatial4j</include>
+        <include>commons-cli:commons-cli</include>
+        <include>org.eclipse.jetty:*</include>
+        <include>com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru</include>
+        <include>jline:jline</include>
+        <include>com.fasterxml.jackson.core:*</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.scriptSourceDirectory}</directory>
+      <outputDirectory>blur-indexer-${project.version}</outputDirectory>
+      <excludes>
+        <exclude>**/.empty</exclude>
+      </excludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
new file mode 100644
index 0000000..a9caabb
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
@@ -0,0 +1,17 @@
+package org.apache.blur.mapreduce.lib.update;
+
+public enum BlurIndexCounter {
+
+  NEW_RECORDS,
+  ROW_IDS_FROM_INDEX,
+  ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
+  ROW_IDS_FROM_NEW_DATA,
+  
+  INPUT_FORMAT_MAPPER, 
+  INPUT_FORMAT_EXISTING_RECORDS,
+  
+  LOOKUP_MAPPER, 
+  LOOKUP_MAPPER_EXISTING_RECORDS, 
+  LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
new file mode 100644
index 0000000..d44adf1
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
@@ -0,0 +1,362 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.xml.DOMConfigurator;
+
+public class ClusterDriver extends Configured implements Tool {
+
+  private static final String BLUR_ENV = "blur.env";
+  private static final Log LOG = LogFactory.getLog(ClusterDriver.class);
+  private static final String _SEP = "_";
+  private static final String IMPORT = "import";
+
+  public static void main(String[] args) throws Exception {
+    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
+    System.out.println("Log file path [" + logFilePath + "]");
+    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
+    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
+    if (url != null) {
+      LOG.info("Reseting log4j config from classpath resource [{0}]", url);
+      LogManager.resetConfiguration();
+      DOMConfigurator.configure(url);
+    }
+    int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args);
+    System.exit(res);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int c = 0;
+    final String blurEnv = args[c++];
+    final String blurZkConnection = args[c++];
+    final String extraConfig = args[c++];
+    final int reducerMultiplier = Integer.parseInt(args[c++]);
+    final Configuration conf = getConf();
+
+    final ExecutorService service = Executors.newCachedThreadPool();
+    final AtomicBoolean running = new AtomicBoolean();
+    running.set(true);
+
+    // Load configs for all filesystems.
+    Path path = new Path(extraConfig);
+    Configuration mergeHdfsConfigs = HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), path);
+    conf.addResource(mergeHdfsConfigs);
+    conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection);
+    conf.set(BLUR_ENV, blurEnv);
+
+    final Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+
+    stopAllExistingMRJobs(blurEnv, conf);
+    cleanUpOldImportDirs(client, conf);
+    moveInprogressDirsBackToNew(client, conf);
+    unlockLockedTables(client);
+
+    Map<String, Future<Void>> futures = new HashMap<String, Future<Void>>();
+    while (running.get()) {
+      LOG.debug("Starting index update check for blur cluster [" + blurZkConnection + "].");
+      try {
+        List<String> tableList = client.tableList();
+        startMissingIndexerThreads(tableList, service, futures, blurZkConnection, conf, client, reducerMultiplier);
+      } catch (TException t) {
+        LOG.error("Unknown Blur Thrift Error, Retrying...", t);
+      }
+      Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+    }
+    return 0;
+  }
+
+  private void unlockLockedTables(Iface client) throws BlurException, TException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      TableDescriptor tableDescriptor = client.describe(table);
+      if (tableDescriptor.isEnabled()) {
+        unlockLockedTables(client, table);
+      }
+    }
+  }
+
+  private void unlockLockedTables(Iface client, String table) throws BlurException, TException {
+    Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+    for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+      List<String> value = e.getValue();
+      if (value.contains(FasterDriver.MRUPDATE_SNAPSHOT)) {
+        LOG.info("Unlocking table [{0}]", table);
+        client.removeSnapshot(table, FasterDriver.MRUPDATE_SNAPSHOT);
+        return;
+      }
+    }
+  }
+
+  private void moveInprogressDirsBackToNew(Iface client, Configuration conf) throws BlurException, TException,
+      IOException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+      Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+      Path newData = new Path(mrIncWorkingPath, FasterDriver.NEW);
+      Path inprogressData = new Path(mrIncWorkingPath, FasterDriver.INPROGRESS);
+      FileSystem fileSystem = inprogressData.getFileSystem(conf);
+      FileStatus[] listStatus = fileSystem.listStatus(inprogressData);
+      for (FileStatus fileStatus : listStatus) {
+        Path src = fileStatus.getPath();
+        Path dst = new Path(newData, src.getName());
+        if (fileSystem.rename(src, dst)) {
+          LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst);
+        } else {
+          LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, dst);
+        }
+      }
+    }
+  }
+
+  private void cleanUpOldImportDirs(Iface client, Configuration conf) throws BlurException, TException, IOException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      cleanUpOldImportDirs(client, conf, table);
+    }
+  }
+
+  private void cleanUpOldImportDirs(Iface client, Configuration conf, String table) throws BlurException, TException,
+      IOException {
+    TableDescriptor descriptor = client.describe(table);
+    String tableUri = descriptor.getTableUri();
+    Path tablePath = new Path(tableUri);
+    FileSystem fileSystem = tablePath.getFileSystem(getConf());
+    Path importPath = new Path(tablePath, IMPORT);
+    if (fileSystem.exists(importPath)) {
+      for (FileStatus fileStatus : fileSystem.listStatus(importPath)) {
+        Path path = fileStatus.getPath();
+        LOG.info("Removing failed import [{0}]", path);
+        fileSystem.delete(path, true);
+      }
+    }
+  }
+
+  private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException,
+      InterruptedException {
+    Cluster cluster = new Cluster(conf);
+    JobStatus[] allJobStatuses = cluster.getAllJobStatuses();
+    for (JobStatus jobStatus : allJobStatuses) {
+      if (jobStatus.isJobComplete()) {
+        continue;
+      }
+      String jobFile = jobStatus.getJobFile();
+      JobID jobID = jobStatus.getJobID();
+      Job job = cluster.getJob(jobID);
+      FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+      Configuration configuration = new Configuration(false);
+      Path path = new Path(jobFile);
+      Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+      if (hasReadAccess(fileSystem, makeQualified)) {
+        try (FSDataInputStream in = fileSystem.open(makeQualified)) {
+          configuration.addResource(copy(in));
+        }
+        String jobBlurEnv = configuration.get(BLUR_ENV);
+        LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv);
+        if (blurEnv.equals(jobBlurEnv)) {
+          LOG.info("Killing running job [{0}]", jobID);
+          job.killJob();
+        }
+      }
+    }
+  }
+
+  private static InputStream copy(FSDataInputStream input) throws IOException {
+    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+      IOUtils.copy(input, outputStream);
+      return new ByteArrayInputStream(outputStream.toByteArray());
+    }
+  }
+
+  private static boolean hasReadAccess(FileSystem fileSystem, Path p) {
+    try {
+      fileSystem.access(p, FsAction.READ);
+      return true;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  private Callable<Void> getCallable(final String blurZkConnection, final Configuration conf, final Iface client,
+      final String table, final int reducerMultiplier) {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        String originalThreadName = Thread.currentThread().getName();
+        try {
+          Thread.currentThread().setName(table);
+          if (!isEnabled(client, table)) {
+            LOG.info("Table [" + table + "] is not enabled.");
+            return null;
+          }
+          waitForDataToLoad(client, table);
+          LOG.debug("Starting index update for table [" + table + "].");
+          final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+          final String outputPathStr = getOutputPathStr(client, table);
+          Path path = new Path(outputPathStr);
+          FileSystem fileSystem = path.getFileSystem(getConf());
+
+          Configuration configuration = new Configuration(conf);
+          BlurInputFormat.setMaxNumberOfMaps(configuration, 10000);
+
+          FasterDriver driver = new FasterDriver();
+          driver.setConf(configuration);
+          try {
+            driver.run(new String[] { table, mrIncWorkingPathStr, outputPathStr, blurZkConnection,
+                Integer.toString(reducerMultiplier) });
+          } finally {
+            if (fileSystem.exists(path)) {
+              fileSystem.delete(path, true);
+            }
+          }
+          return null;
+        } finally {
+          Thread.currentThread().setName(originalThreadName);
+        }
+      }
+    };
+  }
+
+  private void startMissingIndexerThreads(List<String> tableList, ExecutorService service,
+      Map<String, Future<Void>> futures, final String blurZkConnection, final Configuration conf, final Iface client,
+      int reducerMultiplier) throws BlurException, TException {
+    Set<String> tables = new HashSet<String>(tableList);
+
+    // remove futures that are complete
+    for (String table : tables) {
+      Future<Void> future = futures.get(table);
+      if (future != null) {
+        if (future.isDone()) {
+          try {
+            future.get();
+          } catch (InterruptedException e) {
+            LOG.error("Unknown error while processing table [" + table + "].", e);
+          } catch (ExecutionException e) {
+            LOG.error("Unknown error while processing table [" + table + "].", e.getCause());
+          }
+          futures.remove(table);
+        } else {
+          LOG.info("Update for table [" + table + "] still running.");
+        }
+      }
+    }
+
+    // start missing tables
+    for (String table : tables) {
+      if (!futures.containsKey(table)) {
+        if (isEnabled(client, table)) {
+          Future<Void> future = service.submit(getCallable(blurZkConnection, conf, client, table, reducerMultiplier));
+          futures.put(table, future);
+        }
+      }
+    }
+  }
+
+  public static void waitForDataToLoad(Iface client, String table) throws BlurException, TException,
+      InterruptedException {
+    if (isFullyLoaded(client.tableStats(table))) {
+      return;
+    }
+    while (true) {
+      TableStats tableStats = client.tableStats(table);
+      if (isFullyLoaded(tableStats)) {
+        LOG.info("Data load complete in table [" + table + "] [" + tableStats + "]");
+        return;
+      }
+      LOG.info("Waiting for data to load in table [" + table + "] [" + tableStats + "]");
+      Thread.sleep(5000);
+    }
+  }
+
+  private static boolean isFullyLoaded(TableStats tableStats) {
+    if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isEnabled(Iface client, String table) throws BlurException, TException {
+    TableDescriptor tableDescriptor = client.describe(table);
+    return tableDescriptor.isEnabled();
+  }
+
+  private void mkdirs(FileSystem fileSystem, Path path) throws IOException {
+    if (fileSystem.exists(path)) {
+      return;
+    }
+    LOG.info("Creating path [" + path + "].");
+    if (!fileSystem.mkdirs(path)) {
+      LOG.error("Path [" + path + "] could not be created.");
+    }
+  }
+
+  public static String getMRIncWorkingPathStr(Iface client, String table) throws BlurException, TException, IOException {
+    TableDescriptor descriptor = client.describe(table);
+    Map<String, String> tableProperties = descriptor.getTableProperties();
+    if (tableProperties != null) {
+      String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+      if (workingPath != null) {
+        return workingPath;
+      }
+    }
+    throw new IOException("Table [" + table + "] does not have the property ["
+        + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly.");
+  }
+
+  private String getOutputPathStr(Iface client, String table) throws BlurException, TException, IOException {
+    TableDescriptor descriptor = client.describe(table);
+    String tableUri = descriptor.getTableUri();
+    Path tablePath = new Path(tableUri);
+    FileSystem fileSystem = tablePath.getFileSystem(getConf());
+    Path importPath = new Path(tablePath, IMPORT);
+    mkdirs(fileSystem, importPath);
+    return new Path(importPath, IMPORT + _SEP + System.currentTimeMillis() + _SEP + UUID.randomUUID().toString())
+        .toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
new file mode 100644
index 0000000..f43cba5
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/FasterDriver.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class FasterDriver extends Configured implements Tool {
+
+  public static final String BLUR_UPDATE_ID = "blur.update.id";
+  private static final String BLUR_EXEC_TYPE = "blur.exec.type";
+  public static final String TMP = "tmp";
+
+  public enum EXEC {
+    MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC
+  }
+
+  public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
+  public static final String CACHE = "cache";
+  public static final String COMPLETE = "complete";
+  public static final String INPROGRESS = "inprogress";
+  public static final String NEW = "new";
+  private static final Log LOG = LogFactory.getLog(FasterDriver.class);
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new FasterDriver(), args);
+    System.exit(res);
+  }
+
+  static class PartitionedInputResult {
+    final Path _partitionedInputData;
+    final Counters _counters;
+    final long[] _rowIdsFromNewData;
+    final long[] _rowIdsToUpdateFromNewData;
+    final long[] _rowIdsFromIndex;
+
+    PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
+      _partitionedInputData = partitionedInputData;
+      _counters = counters;
+      _rowIdsFromNewData = new long[shards];
+      _rowIdsToUpdateFromNewData = new long[shards];
+      _rowIdsFromIndex = new long[shards];
+      for (TaskReport tr : taskReports) {
+        int id = tr.getTaskID().getId();
+        Counters taskCounters = tr.getTaskCounters();
+        Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+        _rowIdsFromNewData[id] = total.getValue();
+        Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+        _rowIdsToUpdateFromNewData[id] = update.getValue();
+        Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+        _rowIdsFromIndex[id] = index.getValue();
+      }
+    }
+
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int c = 0;
+    if (args.length < 5) {
+      System.err
+          .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
+      return 1;
+    }
+    String table = args[c++];
+    String mrIncWorkingPathStr = args[c++];
+    String outputPathStr = args[c++];
+    String blurZkConnection = args[c++];
+    int reducerMultipler = Integer.parseInt(args[c++]);
+    for (; c < args.length; c++) {
+      String externalConfigFileToAdd = args[c];
+      getConf().addResource(new Path(externalConfigFileToAdd));
+    }
+
+    Path outputPath = new Path(outputPathStr);
+    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
+
+    Path newData = new Path(mrIncWorkingPath, NEW);
+    Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
+    Path completeData = new Path(mrIncWorkingPath, COMPLETE);
+    Path fileCache = new Path(mrIncWorkingPath, CACHE);
+    Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP);
+
+    Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString());
+
+    fileSystem.mkdirs(newData);
+    fileSystem.mkdirs(inprogressData);
+    fileSystem.mkdirs(completeData);
+    fileSystem.mkdirs(fileCache);
+
+    List<Path> srcPathList = new ArrayList<Path>();
+    for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
+      srcPathList.add(fileStatus.getPath());
+    }
+    if (srcPathList.isEmpty()) {
+      return 0;
+    }
+
+    List<Path> inprogressPathList = new ArrayList<Path>();
+    boolean success = false;
+    Iface client = null;
+
+    EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase());
+
+    String uuid = UUID.randomUUID().toString();
+
+    try {
+      client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+      TableDescriptor descriptor = client.describe(table);
+      Map<String, String> tableProperties = descriptor.getTableProperties();
+      String fastDir = tableProperties.get("blur.table.disable.fast.dir");
+      if (fastDir == null || !fastDir.equals("true")) {
+        LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table);
+        return 1;
+      }
+
+      waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
+      client.createSnapshot(table, MRUPDATE_SNAPSHOT);
+      TableStats tableStats = client.tableStats(table);
+
+      inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
+
+      switch (exec) {
+      case MR_ONLY:
+        success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler);
+        break;
+      case MR_WITH_LOOKUP:
+        success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+        break;
+      case AUTOMATIC:
+        success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+        break;
+      default:
+        throw new RuntimeException("Exec type [" + exec + "] not supported.");
+      }
+    } finally {
+      if (success) {
+        LOG.info("Associate lookup cache with new data!");
+        associateLookupCache(uuid, fileCache, outputPath);
+        LOG.info("Indexing job succeeded!");
+        client.loadData(table, outputPathStr);
+        LOG.info("Load data called");
+        movePathList(fileSystem, completeData, inprogressPathList);
+        LOG.info("Input data moved to complete");
+        ClusterDriver.waitForDataToLoad(client, table);
+        LOG.info("Data loaded");
+      } else {
+        LOG.error("Indexing job failed!");
+        movePathList(fileSystem, newData, inprogressPathList);
+      }
+      fileSystem.delete(tmpPath, true);
+      if (client != null) {
+        client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
+      }
+    }
+
+    if (success) {
+      return 0;
+    } else {
+      return 1;
+    }
+  }
+
+  private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException {
+    FileSystem fileSystem = fileCache.getFileSystem(getConf());
+    cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache);
+    associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath);
+  }
+
+  private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException {
+    FileStatus[] listStatus = fileSystem.listStatus(fileCache);
+    List<FileStatus> uuidPaths = new ArrayList<FileStatus>();
+    for (FileStatus fs : listStatus) {
+      Path path = fs.getPath();
+      if (fs.isDirectory()) {
+        cleanupExtraFileFromSpecX(fileSystem, uuid, path);
+      } else if (path.getName().startsWith(uuid)) {
+        uuidPaths.add(fs);
+      }
+    }
+    if (uuidPaths.size() > 1) {
+      deleteIncomplete(fileSystem, uuidPaths);
+    }
+  }
+
+  private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> uuidPaths) throws IOException {
+    long max = 0;
+    FileStatus keeper = null;
+    for (FileStatus fs : uuidPaths) {
+      long len = fs.getLen();
+      if (len > max) {
+        keeper = fs;
+        max = len;
+      }
+    }
+    for (FileStatus fs : uuidPaths) {
+      if (fs != keeper) {
+        LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath());
+        fileSystem.delete(fs.getPath(), false);
+      }
+    }
+  }
+
+  private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath)
+      throws IOException {
+    Path path = fileCache.getPath();
+    if (fileCache.isDirectory()) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fs : listStatus) {
+        associateLookupCache(fileSystem, uuid, fs, outputPath);
+      }
+    } else if (path.getName().startsWith(uuid)) {
+      Path parent = path.getParent();
+      String shardName = parent.getName();
+      Path indexPath = findOutputDirPath(outputPath, shardName);
+      LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath);
+      String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath);
+      Path file = new Path(path.getParent(), id + ".seq");
+      MergeSortRowIdMatcher.commitWriter(getConf(), file, path);
+    }
+  }
+
+  private Path findOutputDirPath(Path outputPath, String shardName) throws IOException {
+    FileSystem fileSystem = outputPath.getFileSystem(getConf());
+    Path shardPath = new Path(outputPath, shardName);
+    if (!fileSystem.exists(shardPath)) {
+      throw new IOException("Shard path [" + shardPath + "]");
+    }
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");          
+      }
+    });
+    if (listStatus.length == 1) {
+      FileStatus fs = listStatus[0];
+      return fs.getPath();
+    } else {
+      throw new IOException("More than one sub dir [" + shardPath + "]");
+    }
+  }
+
+  private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+      throws ClassNotFoundException, IOException, InterruptedException {
+    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+        fileCache);
+
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+    InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
+    InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
+    InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
+    InputSplitPruneUtil.setTable(job, table);
+
+    BlurInputFormat.setLocalCachePath(job, fileCache);
+
+    // Existing data - This adds the copy data files first open and stream
+    // through all documents.
+    {
+      Path tablePath = new Path(descriptor.getTableUri());
+      BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+      MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, MapperForExistingDataMod.class);
+    }
+
+    // Existing data - This adds the row id lookup
+    {
+      MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
+      FileInputFormat.addInputPath(job, result._partitionedInputData);
+      MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
+          MapperForExistingDataWithIndexLookup.class);
+    }
+
+    // New Data
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+      throws ClassNotFoundException, IOException, InterruptedException {
+    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+        fileCache);
+
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+    MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
+    FileInputFormat.addInputPath(job, result._partitionedInputData);
+    MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
+        MapperForExistingDataWithIndexLookup.class);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
+      Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+    Path tablePath = new Path(descriptor.getTableUri());
+    BlurInputFormat.setLocalCachePath(job, fileCache);
+    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+    MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingDataMod.class);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor,
+      List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException,
+      InterruptedException {
+    Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]");
+    job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
+
+    // Needed for the bloom filter path information.
+    BlurOutputFormat.setTableDescriptor(job, descriptor);
+    BlurInputFormat.setLocalCachePath(job, fileCachePath);
+    MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+    }
+    Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
+    job.setJarByClass(getClass());
+    job.setMapperClass(LookupBuilderMapper.class);
+    job.setReducerClass(LookupBuilderReducer.class);
+
+    int shardCount = descriptor.getShardCount();
+    job.setNumReduceTasks(shardCount);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BooleanWritable.class);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    if (job.waitForCompletion(true)) {
+      return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE));
+    } else {
+      throw new IOException("Partitioning failed!");
+    }
+  }
+
+  private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException,
+      TException, InterruptedException {
+    while (true) {
+      Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+      boolean mrupdateSnapshots = false;
+      for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+        List<String> value = e.getValue();
+        if (value.contains(snapshot)) {
+          mrupdateSnapshots = true;
+        }
+      }
+      if (!mrupdateSnapshots) {
+        return;
+      } else {
+        LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        LOG.info("Retrying");
+      }
+    }
+  }
+
+  private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    for (Path src : lst) {
+      Path dst = new Path(dstDir, src.getName());
+      if (fileSystem.rename(src, dst)) {
+        LOG.info("Moving [{0}] to [{1}]", src, dst);
+        result.add(dst);
+      } else {
+        LOG.error("Could not move [{0}] to [{1}]", src, dst);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
new file mode 100644
index 0000000..34d3e99
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/HdfsConfigurationNamespaceMerge.java
@@ -0,0 +1,115 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsConfigurationNamespaceMerge {
+
+  private static final String DFS_NAMESERVICES = "dfs.nameservices";
+  private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class);
+
+  public static void main(String[] args) throws IOException {
+    Path p = new Path("./src/main/scripts/conf/hdfs");
+
+    Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p);
+
+    // configuration.writeXml(System.out);
+
+    Collection<String> nameServices = configuration.getStringCollection(DFS_NAMESERVICES);
+    for (String name : nameServices) {
+      Path path = new Path("hdfs://" + name + "/");
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        System.out.println(fileStatus.getPath());
+      }
+    }
+  }
+
+  private static boolean checkHostName(String host) {
+    try {
+      InetAddress.getAllByName(host);
+      return true;
+    } catch (UnknownHostException e) {
+      LOG.warn("Host not found [" + host + "]");
+      return false;
+    }
+  }
+
+  public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException {
+    List<Configuration> configList = new ArrayList<Configuration>();
+    gatherConfigs(fs, p, configList);
+    return merge(configList);
+  }
+
+  public static Configuration merge(List<Configuration> configList) throws IOException {
+    Configuration merge = new Configuration(false);
+    Set<String> nameServices = new HashSet<String>();
+    for (Configuration configuration : configList) {
+      String nameService = configuration.get(DFS_NAMESERVICES);
+      if (nameServices.contains(nameService)) {
+        throw new IOException("Multiple confs define namespace [" + nameService + "]");
+      }
+      nameServices.add(nameService);
+      if (shouldAdd(configuration, nameService)) {
+        for (Entry<String, String> e : configuration) {
+          String key = e.getKey();
+          if (key.contains(nameService)) {
+            String value = e.getValue();
+            merge.set(key, value);
+          }
+        }
+      }
+    }
+    merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ","));
+    return merge;
+  }
+
+  private static boolean shouldAdd(Configuration configuration, String nameService) {
+    for (Entry<String, String> e : configuration) {
+      String key = e.getKey();
+      if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) {
+        return checkHostName(getHost(e.getValue()));
+      }
+    }
+    return false;
+  }
+
+  private static String getHost(String host) {
+    return host.substring(0, host.indexOf(":"));
+  }
+
+  public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> configList) throws IOException {
+    if (fs.isFile(p)) {
+      if (p.getName().endsWith(".xml")) {
+        LOG.info("Loading file [" + p + "]");
+        Configuration configuration = new Configuration(false);
+        configuration.addResource(p);
+        configList.add(configuration);
+      } else {
+        LOG.info("Skipping file [" + p + "]");
+      }
+    } else {
+      FileStatus[] listStatus = fs.listStatus(p);
+      for (FileStatus fileStatus : listStatus) {
+        gatherConfigs(fs, fileStatus.getPath(), configList);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
new file mode 100644
index 0000000..e295073
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/InputSplitPruneUtil.java
@@ -0,0 +1,133 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public class InputSplitPruneUtil {
+
+  private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count";
+  private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count.";
+  private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count.";
+
+  private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table";
+  private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard";
+  private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard";
+
+  private static final double DEFAULT_LOOKUP_RATIO = 0.5;
+  private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE;
+
+  public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) {
+    double lookupRatio = getLookupRatio(configuration);
+    long maxLookupCount = getMaxLookupCount(configuration);
+    long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard);
+    long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard);
+    long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard);
+    return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio,
+        maxLookupCount);
+  }
+
+  private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount,
+      long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) {
+    if (rowIdUpdateFromNewDataCount > maxLookupCount) {
+      return false;
+    }
+    double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount;
+    if (d <= lookupRatio) {
+      return true;
+    }
+    return false;
+  }
+
+  public static double getLookupRatio(Configuration configuration) {
+    return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO);
+  }
+
+  private static long getMaxLookupCount(Configuration configuration) {
+    return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL);
+  }
+
+  public static void setTable(Job job, String table) {
+    setTable(job.getConfiguration(), table);
+  }
+
+  public static void setTable(Configuration configuration, String table) {
+    configuration.set(BLUR_LOOKUP_TABLE, table);
+  }
+
+  public static String getTable(Configuration configuration) {
+    return configuration.get(BLUR_LOOKUP_TABLE);
+  }
+
+  public static String getBlurLookupRowIdFromIndexCountName(String table) {
+    return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table;
+  }
+
+  public static String getBlurLookupRowIdFromNewDataCountName(String table) {
+    return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table;
+  }
+
+  public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) {
+    return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table;
+  }
+
+  public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts));
+  }
+
+  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts));
+  }
+
+  public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts));
+  }
+
+  public static long getCount(String[] strings, int shard) {
+    return Long.parseLong(strings[shard]);
+  }
+
+  public static int getShardFromDirectoryPath(Path path) {
+    return ShardUtil.getShardIndex(path.getName());
+  }
+
+  public static String[] toStrings(long[] counts) {
+    if (counts == null) {
+      return null;
+    }
+    String[] strs = new String[counts.length];
+    for (int i = 0; i < counts.length; i++) {
+      strs[i] = Long.toString(counts[i]);
+    }
+    return strs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
new file mode 100644
index 0000000..ac0d91f
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderMapper.java
@@ -0,0 +1,18 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class LookupBuilderMapper extends Mapper<Text, BlurRecord, Text, NullWritable> {
+
+  @Override
+  protected void map(Text key, BlurRecord value, Mapper<Text, BlurRecord, Text, NullWritable>.Context context)
+      throws IOException, InterruptedException {
+    context.write(new Text(value.getRowId()), NullWritable.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
new file mode 100644
index 0000000..1983cae
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/LookupBuilderReducer.java
@@ -0,0 +1,165 @@
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, BooleanWritable> {
+
+  public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes";
+  private Counter _rowIds;
+  private Counter _rowIdsToUpdate;
+
+  private MergeSortRowIdMatcher _matcher;
+  private int _numberOfShardsInTable;
+  private Configuration _configuration;
+  private String _snapshot;
+  private Path _tablePath;
+  private Counter _rowIdsFromIndex;
+  private long _totalNumberOfBytes;
+  private Action _action;
+  private Closer _closer;
+  private Path _cachePath;
+  private String _table;
+  private Writer _writer;
+
+  @Override
+  protected void setup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
+      InterruptedException {
+    _configuration = context.getConfiguration();
+    _rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+    _rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+    _rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    _numberOfShardsInTable = tableDescriptor.getShardCount();
+    _tablePath = new Path(tableDescriptor.getTableUri());
+    _snapshot = MapperForExistingDataWithIndexLookup.getSnapshot(_configuration);
+    _totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
+    _cachePath = BlurInputFormat.getLocalCachePath(_configuration);
+    _table = tableDescriptor.getName();
+    _closer = Closer.create();
+  }
+
+  @Override
+  protected void reduce(Text rowId, Iterable<NullWritable> nothing,
+      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, InterruptedException {
+    if (_matcher == null) {
+      _matcher = getMergeSortRowIdMatcher(rowId, context);
+    }
+    if (_writer == null) {
+      _writer = getRowIdWriter(rowId, context);
+    }
+    _writer.append(rowId, NullWritable.get());
+    _rowIds.increment(1);
+    if (_action == null) {
+      _action = new Action() {
+        @Override
+        public void found(Text rowId) throws IOException {
+          _rowIdsToUpdate.increment(1);
+          try {
+            context.write(rowId, new BooleanWritable(true));
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          }
+        }
+      };
+    }
+    _matcher.lookup(rowId, _action);
+  }
+
+  private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, BooleanWritable>.Context context)
+      throws IOException {
+    BlurPartitioner blurPartitioner = new BlurPartitioner();
+    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    String shardName = ShardUtil.getShardName(shard);
+    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
+    Configuration configuration = context.getConfiguration();
+    String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID);
+    Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context));
+    return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath));
+  }
+
+  private String getAttemptString(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return taskAttemptID.toString();
+  }
+
+  @Override
+  protected void cleanup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
+      InterruptedException {
+    _closer.close();
+  }
+
+  private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId,
+      Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException {
+    BlurPartitioner blurPartitioner = new BlurPartitioner();
+    int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    String shardName = ShardUtil.getShardName(shard);
+
+    Path shardPath = new Path(_tablePath, shardName);
+    HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+    SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
+        SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+    Long generation = policy.getGeneration(_snapshot);
+    if (generation == null) {
+      hdfsDirectory.close();
+      throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
+    }
+
+    BlurConfiguration bc = new BlurConfiguration();
+    BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
+        _totalNumberOfBytes);
+    _closer.register(blockCacheDirectoryFactoryV2);
+    Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
+    List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+    IndexCommit indexCommit = MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, shardPath);
+    DirectoryReader reader = DirectoryReader.open(indexCommit);
+    _rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader));
+
+    Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
+    return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context);
+  }
+
+  private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException {
+    long total = 0;
+    List<AtomicReaderContext> leaves = reader.leaves();
+    for (AtomicReaderContext context : leaves) {
+      AtomicReader atomicReader = context.reader();
+      Terms terms = atomicReader.terms(BlurConstants.ROW_ID);
+      long expectedInsertions = terms.size();
+      if (expectedInsertions < 0) {
+        return -1;
+      }
+      total += expectedInsertions;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
new file mode 100644
index 0000000..bf86e19
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataMod.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.TableBlurRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class MapperForExistingDataMod extends Mapper<Text, TableBlurRecord, IndexKey, IndexValue> {
+
+  private Counter _existingRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER);
+    counter.increment(1);
+    _existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS);
+  }
+
+  @Override
+  protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
+    BlurRecord blurRecord = value.getBlurRecord();
+    IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId());
+    context.write(oldDataKey, new IndexValue(blurRecord));
+    _existingRecords.increment(1L);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
new file mode 100644
index 0000000..0e2fe66
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingDataWithIndexLookup.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class MapperForExistingDataWithIndexLookup extends Mapper<Text, BooleanWritable, IndexKey, IndexValue> {
+
+  private static final Log LOG = LogFactory.getLog(MapperForExistingDataWithIndexLookup.class);
+
+  private static final String BLUR_SNAPSHOT = "blur.snapshot";
+  private Counter _existingRecords;
+  private Counter _rowLookup;
+  private BlurPartitioner _blurPartitioner;
+  private Path _tablePath;
+  private int _numberOfShardsInTable;
+  private Configuration _configuration;
+  private String _snapshot;
+
+  private int _indexShard = -1;
+  private DirectoryReader _reader;
+  private IndexSearcher _indexSearcher;
+  private long _totalNumberOfBytes;
+  private Closer _closer;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
+    counter.increment(1);
+
+    _configuration = context.getConfiguration();
+    _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
+    _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
+    _blurPartitioner = new BlurPartitioner();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    _numberOfShardsInTable = tableDescriptor.getShardCount();
+    _tablePath = new Path(tableDescriptor.getTableUri());
+    _snapshot = getSnapshot(_configuration);
+    _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
+    _closer = Closer.create();
+  }
+
+  @Override
+  protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException {
+    if (value.get()) {
+      String rowId = key.toString();
+      LOG.debug("Looking up rowid [" + rowId + "]");
+      _rowLookup.increment(1);
+      IndexSearcher indexSearcher = getIndexSearcher(rowId);
+      Term term = new Term(BlurConstants.ROW_ID, rowId);
+      RowCollector collector = getCollector(context);
+      indexSearcher.search(new TermQuery(term), collector);
+      LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records");
+    }
+  }
+
+  @Override
+  protected void cleanup(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) throws IOException,
+      InterruptedException {
+    _closer.close();
+  }
+
+  static class RowCollector extends Collector {
+
+    private AtomicReader reader;
+    private Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context _context;
+    private Counter _existingRecords;
+    int records;
+
+    RowCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context, Counter existingRecords) {
+      _context = context;
+      _existingRecords = existingRecords;
+    }
+
+    @Override
+    public void setScorer(Scorer scorer) throws IOException {
+
+    }
+
+    @Override
+    public void setNextReader(AtomicReaderContext context) throws IOException {
+      reader = context.reader();
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      Document document = reader.document(doc);
+      FetchRecordResult result = RowDocumentUtil.getRecord(document);
+      String rowid = result.getRowid();
+      Record record = result.getRecord();
+      String recordId = record.getRecordId();
+      IndexKey oldDataKey = IndexKey.oldData(rowid, recordId);
+      try {
+        _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record)));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      _existingRecords.increment(1L);
+    }
+
+    private BlurRecord toBlurRecord(String rowId, Record record) {
+      BlurRecord blurRecord = new BlurRecord();
+      blurRecord.setRowId(rowId);
+      blurRecord.setRecordId(record.getRecordId());
+      blurRecord.setFamily(record.getFamily());
+      List<Column> columns = record.getColumns();
+      for (Column column : columns) {
+        blurRecord.addColumn(column.getName(), column.getValue());
+      }
+      return blurRecord;
+    }
+
+    @Override
+    public boolean acceptsDocsOutOfOrder() {
+      return false;
+    }
+  }
+
+  private RowCollector getCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) {
+    return new RowCollector(context, _existingRecords);
+  }
+
+  private IndexSearcher getIndexSearcher(String rowId) throws IOException {
+    int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    if (_indexSearcher != null) {
+      if (shard != _indexShard) {
+        throw new IOException("Input data is not partitioned correctly.");
+      }
+      return _indexSearcher;
+    } else {
+      _indexShard = shard;
+      Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard));
+      HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+      SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
+          SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+      Long generation = policy.getGeneration(_snapshot);
+      if (generation == null) {
+        hdfsDirectory.close();
+        throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
+      }
+
+      BlurConfiguration bc = new BlurConfiguration();
+      BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
+          _totalNumberOfBytes);
+      _closer.register(blockCacheDirectoryFactoryV2);
+      Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
+
+      List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+      IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath);
+      _reader = DirectoryReader.open(indexCommit);
+      return _indexSearcher = new IndexSearcher(_reader);
+    }
+  }
+
+  public static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
+      throws IOException {
+    for (IndexCommit commit : listCommits) {
+      if (commit.getGeneration() == generation) {
+        return commit;
+      }
+    }
+    throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]");
+  }
+
+  public static void setSnapshot(Job job, String snapshot) {
+    setSnapshot(job.getConfiguration(), snapshot);
+  }
+
+  public static void setSnapshot(Configuration configuration, String snapshot) {
+    configuration.set(BLUR_SNAPSHOT, snapshot);
+  }
+
+  public static String getSnapshot(Configuration configuration) {
+    return configuration.get(BLUR_SNAPSHOT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
new file mode 100644
index 0000000..d91d1f5
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewDataMod.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class MapperForNewDataMod extends Mapper<Text, BlurRecord, IndexKey, IndexValue> {
+
+  private static final IndexValue EMPTY_RECORD = new IndexValue();
+  private long _timestamp;
+  private Counter _newRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    InputSplit inputSplit = context.getInputSplit();
+    FileSplit fileSplit = getFileSplit(inputSplit);
+    Path path = fileSplit.getPath();
+    FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    _timestamp = fileStatus.getModificationTime();
+    _newRecords = context.getCounter(BlurIndexCounter.NEW_RECORDS);
+  }
+
+  private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
+    if (inputSplit instanceof FileSplit) {
+      return (FileSplit) inputSplit;
+    }
+    if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
+      try {
+        Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit");
+        declaredField.setAccessible(true);
+        return getFileSplit((InputSplit) declaredField.get(inputSplit));
+      } catch (NoSuchFieldException e) {
+        throw new IOException(e);
+      } catch (SecurityException e) {
+        throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      }
+    } else {
+      throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]");
+    }
+  }
+
+  @Override
+  protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException {
+    IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp);
+    context.write(newDataKey, new IndexValue(blurRecord));
+    _newRecords.increment(1L);
+
+    IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
+    context.write(newDataMarker, EMPTY_RECORD);
+  }
+
+}


Re: [03/13] git commit: Third round of updates.

Posted by Aaron McCurry <am...@gmail.com>.
Will do.  :-)

On Tue, Aug 30, 2016 at 9:10 AM, Tim Williams <wi...@gmail.com> wrote:

> No worries, just a friendly reminder:)  If you get time, I think it'd
> be helpful to a couple sentences about any new stuff/big changes...
> seems like there's a new project for example...
>
> Thanks,
> --tim
>
>
> On Tue, Aug 30, 2016 at 7:49 AM, Aaron McCurry <am...@gmail.com> wrote:
> > I apologize for the big commits without proper messaging.  It was
> difficult
> > to remember the changs and the original commit messages were lost due to
> an
> > offline git repo (which is no longer is use).  I only had the diff
> between
> > the original git repo and everything after the changes.  Plus the diff
> > didn't apply cleanly so that's why I broke it up in to different
> sections.
> >
> > I suppose I should have broke up the changes manually out of the diff and
> > applied them separately and recreated all the commit messages but I
> didn't
> > have the time to work through all of them.  Sorry.
> >
> > Aaron
> >
> >
> > On Tuesday, August 30, 2016, Tim Williams <wi...@gmail.com> wrote:
> >
> >> NoNot sure what this is yet but itPlease be more considerate with your
> >> commit messages... it's a lot of code to look through without having
> >> any context besides "N round of updates."
> >>
> >>
> >> On Mon, Aug 29, 2016 at 9:57 PM,  <amccurry@apache.org <javascript:;>>
> >> wrote:
> >> > Third round of updates.
> >> >
> >> >
> >> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> >> > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/
> >> commit/ea50630a
> >> > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/
> >> ea50630a
> >> > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/
> >> ea50630a
> >> >
> >> > Branch: refs/heads/master
> >> > Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a
> >> > Parents: 0141656
> >> > Author: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> >> > Authored: Sat May 7 13:11:54 2016 -0400
> >> > Committer: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> >> > Committed: Sat May 7 13:11:54 2016 -0400
> >> >
> >> > ------------------------------------------------------------
> ----------
> >> >  blur-indexer/pom.xml                            |  58 +++
> >> >  blur-indexer/src/main/assemble/bin.xml          |  45 ++
> >> >  .../mapreduce/lib/update/BlurIndexCounter.java  |  17 +
> >> >  .../mapreduce/lib/update/ClusterDriver.java     | 362 ++++++++++++++
> >> >  .../blur/mapreduce/lib/update/FasterDriver.java | 486
> >> +++++++++++++++++++
> >> >  .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++
> >> >  .../lib/update/InputSplitPruneUtil.java         | 133 +++++
> >> >  .../lib/update/LookupBuilderMapper.java         |  18 +
> >> >  .../lib/update/LookupBuilderReducer.java        | 165 +++++++
> >> >  .../lib/update/MapperForExistingDataMod.java    |  46 ++
> >> >  .../MapperForExistingDataWithIndexLookup.java   | 228 +++++++++
> >> >  .../lib/update/MapperForNewDataMod.java         |  82 ++++
> >> >  .../lib/update/MergeSortRowIdMatcher.java       | 372 ++++++++++++++
> >> >  .../lib/update/PrunedBlurInputFormat.java       |  57 +++
> >> >  .../update/PrunedSequenceFileInputFormat.java   |  59 +++
> >> >  .../src/main/resources/blur-site.properties     |   1 +
> >> >  .../src/main/resources/program-log4j.xml        |  29 ++
> >> >  blur-indexer/src/main/resources/test-log4j.xml  |  46 ++
> >> >  18 files changed, 2319 insertions(+)
> >> > ------------------------------------------------------------
> ----------
> >> >
> >> >
> >> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> >> ea50630a/blur-indexer/pom.xml
> >> > ------------------------------------------------------------
> ----------
> >> > diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
> >> > new file mode 100644
> >> > index 0000000..c7c1753
> >> > --- /dev/null
> >> > +++ b/blur-indexer/pom.xml
> >> > @@ -0,0 +1,58 @@
> >> > +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
> >> http://www.w3.org/2001/XMLSchema-instance"
> >> > +       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> >> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> >> > +       <modelVersion>4.0.0</modelVersion>
> >> > +       <groupId>org.apache.blur</groupId>
> >> > +       <artifactId>blur-indexer</artifactId>
> >> > +       <version>0.2.8</version>
> >> > +       <name>blur-indexer</name>
> >> > +       <packaging>jar</packaging>
> >> > +
> >> > +       <properties>
> >> > +               <blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-
> >> SNAPSHOT</blur.version>
> >> > +       </properties>
> >> > +       <dependencies>
> >> > +               <dependency>
> >> > +                       <groupId>org.apache.blur</groupId>
> >> > +                       <artifactId>blur-mapred</artifactId>
> >> > +                       <version>${blur.version}</version>
> >> > +               </dependency>
> >> > +               <dependency>
> >> > +                       <groupId>junit</groupId>
> >> > +                       <artifactId>junit</artifactId>
> >> > +                       <version>4.9</version>
> >> > +                       <scope>test</scope>
> >> > +               </dependency>
> >> > +       </dependencies>
> >> > +
> >> > +       <build>
> >> > +               <pluginManagement>
> >> > +                       <plugins>
> >> > +                               <plugin>
> >> > +                                       <groupId>org.apache.maven.
> >> plugins</groupId>
> >> > +                                       <artifactId>maven-compiler-
> >> plugin</artifactId>
> >> > +                                       <configuration>
> >> > +                                               <source>1.8</source>
> >> > +                                               <target>1.8</target>
> >> > +                                       </configuration>
> >> > +                               </plugin>
> >> > +                       </plugins>
> >> > +               </pluginManagement>
> >> > +               <plugins>
> >> > +                       <plugin>
> >> > +                               <artifactId>maven-assembly-
> >> plugin</artifactId>
> >> > +                               <configuration>
> >> > +                                       <descriptor>src/main/assemble/
> >> bin.xml</descriptor>
> >> > +                                       <finalName>blur-indexer-${
> >> project.version}</finalName>
> >> > +                               </configuration>
> >> > +                               <executions>
> >> > +                                       <execution>
> >> > +                                               <phase>package</phase>
> >> > +                                               <goals>
> >> > +
> >>  <goal>single</goal>
> >> > +                                               </goals>
> >> > +                                       </execution>
> >> > +                               </executions>
> >> > +                       </plugin>
> >> > +               </plugins>
> >> > +       </build>
> >> > +</project>
> >> >
> >> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> >> ea50630a/blur-indexer/src/main/assemble/bin.xml
> >> > ------------------------------------------------------------
> ----------
> >> > diff --git a/blur-indexer/src/main/assemble/bin.xml
> >> b/blur-indexer/src/main/assemble/bin.xml
> >> > new file mode 100644
> >> > index 0000000..5fddd56
> >> > --- /dev/null
> >> > +++ b/blur-indexer/src/main/assemble/bin.xml
> >> > @@ -0,0 +1,45 @@
> >> > +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-
> >> plugin/assembly/1.1.2"
> >> > +    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> >> > +           xsi:schemaLocation="http://maven.apache.org/plugins/
> >> maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/
> >> assembly-1.1.2.xsd">
> >> > +  <formats>
> >> > +    <format>tar.gz</format>
> >> > +  </formats>
> >> > +  <includeBaseDirectory>false</includeBaseDirectory>
> >> > +
> >> > +  <dependencySets>
> >> > +    <dependencySet>
> >> > +      <useProjectArtifact>true</useProjectArtifact>
> >> > +      <outputDirectory>blur-indexer-${project.version}/lib</
> >> outputDirectory>
> >> > +      <unpack>false</unpack>
> >> > +      <includes>
> >> > +        <include>org.apache.blur:blur-indexer</include>
> >> > +        <include>org.apache.blur:*</include>
> >> > +        <include>org.apache.zookeeper:zookeeper</include>
> >> > +        <include>org.slf4j:slf4j-api</include>
> >> > +        <include>org.slf4j:slf4j-log4j12</include>
> >> > +        <include>org.json:json</include>
> >> > +        <include>log4j:log4j</include>
> >> > +        <include>com.yammer.metrics:*</include>
> >> > +        <include>com.google.guava:guava</include>
> >> > +        <include>org.apache.httpcomponents:*</include>
> >> > +        <include>org.apache.lucene:*</include>
> >> > +        <include>com.spatial4j:spatial4j</include>
> >> > +        <include>commons-cli:commons-cli</include>
> >> > +        <include>org.eclipse.jetty:*</include>
> >> > +        <include>com.googlecode.concurrentlinkedhashmap:
> >> concurrentlinkedhashmap-lru</include>
> >> > +        <include>jline:jline</include>
> >> > +        <include>com.fasterxml.jackson.core:*</include>
> >> > +      </includes>
> >> > +    </dependencySet>
> >> > +  </dependencySets>
> >> > +
> >> > +  <fileSets>
> >> > +    <fileSet>
> >> > +      <directory>${project.build.scriptSourceDirectory}</directory>
> >> > +      <outputDirectory>blur-indexer-${project.version}</
> >> outputDirectory>
> >> > +      <excludes>
> >> > +        <exclude>**/.empty</exclude>
> >> > +      </excludes>
> >> > +    </fileSet>
> >> > +  </fileSets>
> >> > +</assembly>
> >> >
> >> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> >> ea50630a/blur-indexer/src/main/java/org/apache/blur/
> mapreduce/lib/update/
> >> BlurIndexCounter.java
> >> > ------------------------------------------------------------
> ----------
> >> > diff --git a/blur-indexer/src/main/java/
> org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
> >> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> >> update/BlurIndexCounter.java
> >> > new file mode 100644
> >> > index 0000000..a9caabb
> >> > --- /dev/null
> >> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> >> update/BlurIndexCounter.java
> >> > @@ -0,0 +1,17 @@
> >> > +package org.apache.blur.mapreduce.lib.update;
> >> > +
> >> > +public enum BlurIndexCounter {
> >> > +
> >> > +  NEW_RECORDS,
> >> > +  ROW_IDS_FROM_INDEX,
> >> > +  ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
> >> > +  ROW_IDS_FROM_NEW_DATA,
> >> > +
> >> > +  INPUT_FORMAT_MAPPER,
> >> > +  INPUT_FORMAT_EXISTING_RECORDS,
> >> > +
> >> > +  LOOKUP_MAPPER,
> >> > +  LOOKUP_MAPPER_EXISTING_RECORDS,
> >> > +  LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
> >> > +
> >> > +}
> >> >
> >> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> >> ea50630a/blur-indexer/src/main/java/org/apache/blur/
> mapreduce/lib/update/
> >> ClusterDriver.java
> >> > ------------------------------------------------------------
> ----------
> >> > diff --git a/blur-indexer/src/main/java/
> org/apache/blur/mapreduce/lib/update/ClusterDriver.java
> >> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> >> update/ClusterDriver.java
> >> > new file mode 100644
> >> > index 0000000..d44adf1
> >> > --- /dev/null
> >> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> >> update/ClusterDriver.java
> >> > @@ -0,0 +1,362 @@
> >> > +package org.apache.blur.mapreduce.lib.update;
> >> > +
> >> > +import java.io.ByteArrayInputStream;
> >> > +import java.io.ByteArrayOutputStream;
> >> > +import java.io.IOException;
> >> > +import java.io.InputStream;
> >> > +import java.net.URL;
> >> > +import java.util.HashMap;
> >> > +import java.util.HashSet;
> >> > +import java.util.List;
> >> > +import java.util.Map;
> >> > +import java.util.Map.Entry;
> >> > +import java.util.Set;
> >> > +import java.util.UUID;
> >> > +import java.util.concurrent.Callable;
> >> > +import java.util.concurrent.ExecutionException;
> >> > +import java.util.concurrent.ExecutorService;
> >> > +import java.util.concurrent.Executors;
> >> > +import java.util.concurrent.Future;
> >> > +import java.util.concurrent.TimeUnit;
> >> > +import java.util.concurrent.atomic.AtomicBoolean;
> >> > +
> >> > +import org.apache.blur.log.Log;
> >> > +import org.apache.blur.log.LogFactory;
> >> > +import org.apache.blur.mapreduce.lib.BlurInputFormat;
> >> > +import org.apache.blur.thirdparty.thrift_0_9_0.TException;
> >> > +import org.apache.blur.thrift.BlurClient;
> >> > +import org.apache.blur.thrift.generated.Blur.Iface;
> >> > +import org.apache.blur.thrift.generated.BlurException;
> >> > +import org.apache.blur.thrift.generated.TableDescriptor;
> >> > +import org.apache.blur.thrift.generated.TableStats;
> >> > +import org.apache.blur.utils.BlurConstants;
> >> > +import org.apache.commons.io.IOUtils;
> >> > +import org.apache.hadoop.conf.Configuration;
> >> > +import org.apache.hadoop.conf.Configured;
> >> > +import org.apache.hadoop.fs.FSDataInputStream;
> >> > +import org.apache.hadoop.fs.FileStatus;
> >> > +import org.apache.hadoop.fs.FileSystem;
> >> > +import org.apache.hadoop.fs.Path;
> >> > +import org.apache.hadoop.fs.permission.FsAction;
> >> > +import org.apache.hadoop.mapreduce.Cluster;
> >> > +import org.apache.hadoop.mapreduce.Job;
> >> > +import org.apache.hadoop.mapreduce.JobID;
> >> > +import org.apache.hadoop.mapreduce.JobStatus;
> >> > +import org.apache.hadoop.util.Tool;
> >> > +import org.apache.hadoop.util.ToolRunner;
> >> > +import org.apache.hadoop.yarn.exceptions.YarnException;
> >> > +import org.apache.log4j.LogManager;
> >> > +import org.apache.log4j.xml.DOMConfigurator;
> >> > +
> >> > +public class ClusterDriver extends Configured implements Tool {
> >> > +
> >> > +  private static final String BLUR_ENV = "blur.env";
> >> > +  private static final Log LOG = LogFactory.getLog(
> >> ClusterDriver.class);
> >> > +  private static final String _SEP = "_";
> >> > +  private static final String IMPORT = "import";
> >> > +
> >> > +  public static void main(String[] args) throws Exception {
> >> > +    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
> >> > +    System.out.println("Log file path [" + logFilePath + "]");
> >> > +    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
> >> > +    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
> >> > +    if (url != null) {
> >> > +      LOG.info("Reseting log4j config from classpath resource [{0}]",
> >> url);
> >> > +      LogManager.resetConfiguration();
> >> > +      DOMConfigurator.configure(url);
> >> > +    }
> >> > +    int res = ToolRunner.run(new Configuration(), new
> ClusterDriver(),
> >> args);
> >>
> >> Not sure what this thing does yet but it seems we should validate
> >> those args since their accessed blindly in run...
> >>
> >> --tim
> >>
>

Re: [03/13] git commit: Third round of updates.

Posted by Tim Williams <wi...@gmail.com>.
No worries, just a friendly reminder:)  If you get time, I think it'd
be helpful to a couple sentences about any new stuff/big changes...
seems like there's a new project for example...

Thanks,
--tim


On Tue, Aug 30, 2016 at 7:49 AM, Aaron McCurry <am...@gmail.com> wrote:
> I apologize for the big commits without proper messaging.  It was difficult
> to remember the changs and the original commit messages were lost due to an
> offline git repo (which is no longer is use).  I only had the diff between
> the original git repo and everything after the changes.  Plus the diff
> didn't apply cleanly so that's why I broke it up in to different sections.
>
> I suppose I should have broke up the changes manually out of the diff and
> applied them separately and recreated all the commit messages but I didn't
> have the time to work through all of them.  Sorry.
>
> Aaron
>
>
> On Tuesday, August 30, 2016, Tim Williams <wi...@gmail.com> wrote:
>
>> NoNot sure what this is yet but itPlease be more considerate with your
>> commit messages... it's a lot of code to look through without having
>> any context besides "N round of updates."
>>
>>
>> On Mon, Aug 29, 2016 at 9:57 PM,  <amccurry@apache.org <javascript:;>>
>> wrote:
>> > Third round of updates.
>> >
>> >
>> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
>> > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/
>> commit/ea50630a
>> > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/
>> ea50630a
>> > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/
>> ea50630a
>> >
>> > Branch: refs/heads/master
>> > Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a
>> > Parents: 0141656
>> > Author: Aaron McCurry <amccurry@gmail.com <javascript:;>>
>> > Authored: Sat May 7 13:11:54 2016 -0400
>> > Committer: Aaron McCurry <amccurry@gmail.com <javascript:;>>
>> > Committed: Sat May 7 13:11:54 2016 -0400
>> >
>> > ----------------------------------------------------------------------
>> >  blur-indexer/pom.xml                            |  58 +++
>> >  blur-indexer/src/main/assemble/bin.xml          |  45 ++
>> >  .../mapreduce/lib/update/BlurIndexCounter.java  |  17 +
>> >  .../mapreduce/lib/update/ClusterDriver.java     | 362 ++++++++++++++
>> >  .../blur/mapreduce/lib/update/FasterDriver.java | 486
>> +++++++++++++++++++
>> >  .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++
>> >  .../lib/update/InputSplitPruneUtil.java         | 133 +++++
>> >  .../lib/update/LookupBuilderMapper.java         |  18 +
>> >  .../lib/update/LookupBuilderReducer.java        | 165 +++++++
>> >  .../lib/update/MapperForExistingDataMod.java    |  46 ++
>> >  .../MapperForExistingDataWithIndexLookup.java   | 228 +++++++++
>> >  .../lib/update/MapperForNewDataMod.java         |  82 ++++
>> >  .../lib/update/MergeSortRowIdMatcher.java       | 372 ++++++++++++++
>> >  .../lib/update/PrunedBlurInputFormat.java       |  57 +++
>> >  .../update/PrunedSequenceFileInputFormat.java   |  59 +++
>> >  .../src/main/resources/blur-site.properties     |   1 +
>> >  .../src/main/resources/program-log4j.xml        |  29 ++
>> >  blur-indexer/src/main/resources/test-log4j.xml  |  46 ++
>> >  18 files changed, 2319 insertions(+)
>> > ----------------------------------------------------------------------
>> >
>> >
>> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
>> ea50630a/blur-indexer/pom.xml
>> > ----------------------------------------------------------------------
>> > diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
>> > new file mode 100644
>> > index 0000000..c7c1753
>> > --- /dev/null
>> > +++ b/blur-indexer/pom.xml
>> > @@ -0,0 +1,58 @@
>> > +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance"
>> > +       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>> > +       <modelVersion>4.0.0</modelVersion>
>> > +       <groupId>org.apache.blur</groupId>
>> > +       <artifactId>blur-indexer</artifactId>
>> > +       <version>0.2.8</version>
>> > +       <name>blur-indexer</name>
>> > +       <packaging>jar</packaging>
>> > +
>> > +       <properties>
>> > +               <blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-
>> SNAPSHOT</blur.version>
>> > +       </properties>
>> > +       <dependencies>
>> > +               <dependency>
>> > +                       <groupId>org.apache.blur</groupId>
>> > +                       <artifactId>blur-mapred</artifactId>
>> > +                       <version>${blur.version}</version>
>> > +               </dependency>
>> > +               <dependency>
>> > +                       <groupId>junit</groupId>
>> > +                       <artifactId>junit</artifactId>
>> > +                       <version>4.9</version>
>> > +                       <scope>test</scope>
>> > +               </dependency>
>> > +       </dependencies>
>> > +
>> > +       <build>
>> > +               <pluginManagement>
>> > +                       <plugins>
>> > +                               <plugin>
>> > +                                       <groupId>org.apache.maven.
>> plugins</groupId>
>> > +                                       <artifactId>maven-compiler-
>> plugin</artifactId>
>> > +                                       <configuration>
>> > +                                               <source>1.8</source>
>> > +                                               <target>1.8</target>
>> > +                                       </configuration>
>> > +                               </plugin>
>> > +                       </plugins>
>> > +               </pluginManagement>
>> > +               <plugins>
>> > +                       <plugin>
>> > +                               <artifactId>maven-assembly-
>> plugin</artifactId>
>> > +                               <configuration>
>> > +                                       <descriptor>src/main/assemble/
>> bin.xml</descriptor>
>> > +                                       <finalName>blur-indexer-${
>> project.version}</finalName>
>> > +                               </configuration>
>> > +                               <executions>
>> > +                                       <execution>
>> > +                                               <phase>package</phase>
>> > +                                               <goals>
>> > +
>>  <goal>single</goal>
>> > +                                               </goals>
>> > +                                       </execution>
>> > +                               </executions>
>> > +                       </plugin>
>> > +               </plugins>
>> > +       </build>
>> > +</project>
>> >
>> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
>> ea50630a/blur-indexer/src/main/assemble/bin.xml
>> > ----------------------------------------------------------------------
>> > diff --git a/blur-indexer/src/main/assemble/bin.xml
>> b/blur-indexer/src/main/assemble/bin.xml
>> > new file mode 100644
>> > index 0000000..5fddd56
>> > --- /dev/null
>> > +++ b/blur-indexer/src/main/assemble/bin.xml
>> > @@ -0,0 +1,45 @@
>> > +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-
>> plugin/assembly/1.1.2"
>> > +    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> > +           xsi:schemaLocation="http://maven.apache.org/plugins/
>> maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/
>> assembly-1.1.2.xsd">
>> > +  <formats>
>> > +    <format>tar.gz</format>
>> > +  </formats>
>> > +  <includeBaseDirectory>false</includeBaseDirectory>
>> > +
>> > +  <dependencySets>
>> > +    <dependencySet>
>> > +      <useProjectArtifact>true</useProjectArtifact>
>> > +      <outputDirectory>blur-indexer-${project.version}/lib</
>> outputDirectory>
>> > +      <unpack>false</unpack>
>> > +      <includes>
>> > +        <include>org.apache.blur:blur-indexer</include>
>> > +        <include>org.apache.blur:*</include>
>> > +        <include>org.apache.zookeeper:zookeeper</include>
>> > +        <include>org.slf4j:slf4j-api</include>
>> > +        <include>org.slf4j:slf4j-log4j12</include>
>> > +        <include>org.json:json</include>
>> > +        <include>log4j:log4j</include>
>> > +        <include>com.yammer.metrics:*</include>
>> > +        <include>com.google.guava:guava</include>
>> > +        <include>org.apache.httpcomponents:*</include>
>> > +        <include>org.apache.lucene:*</include>
>> > +        <include>com.spatial4j:spatial4j</include>
>> > +        <include>commons-cli:commons-cli</include>
>> > +        <include>org.eclipse.jetty:*</include>
>> > +        <include>com.googlecode.concurrentlinkedhashmap:
>> concurrentlinkedhashmap-lru</include>
>> > +        <include>jline:jline</include>
>> > +        <include>com.fasterxml.jackson.core:*</include>
>> > +      </includes>
>> > +    </dependencySet>
>> > +  </dependencySets>
>> > +
>> > +  <fileSets>
>> > +    <fileSet>
>> > +      <directory>${project.build.scriptSourceDirectory}</directory>
>> > +      <outputDirectory>blur-indexer-${project.version}</
>> outputDirectory>
>> > +      <excludes>
>> > +        <exclude>**/.empty</exclude>
>> > +      </excludes>
>> > +    </fileSet>
>> > +  </fileSets>
>> > +</assembly>
>> >
>> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
>> ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/
>> BlurIndexCounter.java
>> > ----------------------------------------------------------------------
>> > diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
>> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
>> update/BlurIndexCounter.java
>> > new file mode 100644
>> > index 0000000..a9caabb
>> > --- /dev/null
>> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
>> update/BlurIndexCounter.java
>> > @@ -0,0 +1,17 @@
>> > +package org.apache.blur.mapreduce.lib.update;
>> > +
>> > +public enum BlurIndexCounter {
>> > +
>> > +  NEW_RECORDS,
>> > +  ROW_IDS_FROM_INDEX,
>> > +  ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
>> > +  ROW_IDS_FROM_NEW_DATA,
>> > +
>> > +  INPUT_FORMAT_MAPPER,
>> > +  INPUT_FORMAT_EXISTING_RECORDS,
>> > +
>> > +  LOOKUP_MAPPER,
>> > +  LOOKUP_MAPPER_EXISTING_RECORDS,
>> > +  LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
>> > +
>> > +}
>> >
>> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
>> ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/
>> ClusterDriver.java
>> > ----------------------------------------------------------------------
>> > diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
>> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
>> update/ClusterDriver.java
>> > new file mode 100644
>> > index 0000000..d44adf1
>> > --- /dev/null
>> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
>> update/ClusterDriver.java
>> > @@ -0,0 +1,362 @@
>> > +package org.apache.blur.mapreduce.lib.update;
>> > +
>> > +import java.io.ByteArrayInputStream;
>> > +import java.io.ByteArrayOutputStream;
>> > +import java.io.IOException;
>> > +import java.io.InputStream;
>> > +import java.net.URL;
>> > +import java.util.HashMap;
>> > +import java.util.HashSet;
>> > +import java.util.List;
>> > +import java.util.Map;
>> > +import java.util.Map.Entry;
>> > +import java.util.Set;
>> > +import java.util.UUID;
>> > +import java.util.concurrent.Callable;
>> > +import java.util.concurrent.ExecutionException;
>> > +import java.util.concurrent.ExecutorService;
>> > +import java.util.concurrent.Executors;
>> > +import java.util.concurrent.Future;
>> > +import java.util.concurrent.TimeUnit;
>> > +import java.util.concurrent.atomic.AtomicBoolean;
>> > +
>> > +import org.apache.blur.log.Log;
>> > +import org.apache.blur.log.LogFactory;
>> > +import org.apache.blur.mapreduce.lib.BlurInputFormat;
>> > +import org.apache.blur.thirdparty.thrift_0_9_0.TException;
>> > +import org.apache.blur.thrift.BlurClient;
>> > +import org.apache.blur.thrift.generated.Blur.Iface;
>> > +import org.apache.blur.thrift.generated.BlurException;
>> > +import org.apache.blur.thrift.generated.TableDescriptor;
>> > +import org.apache.blur.thrift.generated.TableStats;
>> > +import org.apache.blur.utils.BlurConstants;
>> > +import org.apache.commons.io.IOUtils;
>> > +import org.apache.hadoop.conf.Configuration;
>> > +import org.apache.hadoop.conf.Configured;
>> > +import org.apache.hadoop.fs.FSDataInputStream;
>> > +import org.apache.hadoop.fs.FileStatus;
>> > +import org.apache.hadoop.fs.FileSystem;
>> > +import org.apache.hadoop.fs.Path;
>> > +import org.apache.hadoop.fs.permission.FsAction;
>> > +import org.apache.hadoop.mapreduce.Cluster;
>> > +import org.apache.hadoop.mapreduce.Job;
>> > +import org.apache.hadoop.mapreduce.JobID;
>> > +import org.apache.hadoop.mapreduce.JobStatus;
>> > +import org.apache.hadoop.util.Tool;
>> > +import org.apache.hadoop.util.ToolRunner;
>> > +import org.apache.hadoop.yarn.exceptions.YarnException;
>> > +import org.apache.log4j.LogManager;
>> > +import org.apache.log4j.xml.DOMConfigurator;
>> > +
>> > +public class ClusterDriver extends Configured implements Tool {
>> > +
>> > +  private static final String BLUR_ENV = "blur.env";
>> > +  private static final Log LOG = LogFactory.getLog(
>> ClusterDriver.class);
>> > +  private static final String _SEP = "_";
>> > +  private static final String IMPORT = "import";
>> > +
>> > +  public static void main(String[] args) throws Exception {
>> > +    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
>> > +    System.out.println("Log file path [" + logFilePath + "]");
>> > +    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
>> > +    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
>> > +    if (url != null) {
>> > +      LOG.info("Reseting log4j config from classpath resource [{0}]",
>> url);
>> > +      LogManager.resetConfiguration();
>> > +      DOMConfigurator.configure(url);
>> > +    }
>> > +    int res = ToolRunner.run(new Configuration(), new ClusterDriver(),
>> args);
>>
>> Not sure what this thing does yet but it seems we should validate
>> those args since their accessed blindly in run...
>>
>> --tim
>>

Re: [03/13] git commit: Third round of updates.

Posted by Aaron McCurry <am...@gmail.com>.
I apologize for the big commits without proper messaging.  It was difficult
to remember the changs and the original commit messages were lost due to an
offline git repo (which is no longer is use).  I only had the diff between
the original git repo and everything after the changes.  Plus the diff
didn't apply cleanly so that's why I broke it up in to different sections.

I suppose I should have broke up the changes manually out of the diff and
applied them separately and recreated all the commit messages but I didn't
have the time to work through all of them.  Sorry.

Aaron


On Tuesday, August 30, 2016, Tim Williams <wi...@gmail.com> wrote:

> NoNot sure what this is yet but itPlease be more considerate with your
> commit messages... it's a lot of code to look through without having
> any context besides "N round of updates."
>
>
> On Mon, Aug 29, 2016 at 9:57 PM,  <amccurry@apache.org <javascript:;>>
> wrote:
> > Third round of updates.
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/
> commit/ea50630a
> > Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/
> ea50630a
> > Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/
> ea50630a
> >
> > Branch: refs/heads/master
> > Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a
> > Parents: 0141656
> > Author: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> > Authored: Sat May 7 13:11:54 2016 -0400
> > Committer: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> > Committed: Sat May 7 13:11:54 2016 -0400
> >
> > ----------------------------------------------------------------------
> >  blur-indexer/pom.xml                            |  58 +++
> >  blur-indexer/src/main/assemble/bin.xml          |  45 ++
> >  .../mapreduce/lib/update/BlurIndexCounter.java  |  17 +
> >  .../mapreduce/lib/update/ClusterDriver.java     | 362 ++++++++++++++
> >  .../blur/mapreduce/lib/update/FasterDriver.java | 486
> +++++++++++++++++++
> >  .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++
> >  .../lib/update/InputSplitPruneUtil.java         | 133 +++++
> >  .../lib/update/LookupBuilderMapper.java         |  18 +
> >  .../lib/update/LookupBuilderReducer.java        | 165 +++++++
> >  .../lib/update/MapperForExistingDataMod.java    |  46 ++
> >  .../MapperForExistingDataWithIndexLookup.java   | 228 +++++++++
> >  .../lib/update/MapperForNewDataMod.java         |  82 ++++
> >  .../lib/update/MergeSortRowIdMatcher.java       | 372 ++++++++++++++
> >  .../lib/update/PrunedBlurInputFormat.java       |  57 +++
> >  .../update/PrunedSequenceFileInputFormat.java   |  59 +++
> >  .../src/main/resources/blur-site.properties     |   1 +
> >  .../src/main/resources/program-log4j.xml        |  29 ++
> >  blur-indexer/src/main/resources/test-log4j.xml  |  46 ++
> >  18 files changed, 2319 insertions(+)
> > ----------------------------------------------------------------------
> >
> >
> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> ea50630a/blur-indexer/pom.xml
> > ----------------------------------------------------------------------
> > diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
> > new file mode 100644
> > index 0000000..c7c1753
> > --- /dev/null
> > +++ b/blur-indexer/pom.xml
> > @@ -0,0 +1,58 @@
> > +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance"
> > +       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> > +       <modelVersion>4.0.0</modelVersion>
> > +       <groupId>org.apache.blur</groupId>
> > +       <artifactId>blur-indexer</artifactId>
> > +       <version>0.2.8</version>
> > +       <name>blur-indexer</name>
> > +       <packaging>jar</packaging>
> > +
> > +       <properties>
> > +               <blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-
> SNAPSHOT</blur.version>
> > +       </properties>
> > +       <dependencies>
> > +               <dependency>
> > +                       <groupId>org.apache.blur</groupId>
> > +                       <artifactId>blur-mapred</artifactId>
> > +                       <version>${blur.version}</version>
> > +               </dependency>
> > +               <dependency>
> > +                       <groupId>junit</groupId>
> > +                       <artifactId>junit</artifactId>
> > +                       <version>4.9</version>
> > +                       <scope>test</scope>
> > +               </dependency>
> > +       </dependencies>
> > +
> > +       <build>
> > +               <pluginManagement>
> > +                       <plugins>
> > +                               <plugin>
> > +                                       <groupId>org.apache.maven.
> plugins</groupId>
> > +                                       <artifactId>maven-compiler-
> plugin</artifactId>
> > +                                       <configuration>
> > +                                               <source>1.8</source>
> > +                                               <target>1.8</target>
> > +                                       </configuration>
> > +                               </plugin>
> > +                       </plugins>
> > +               </pluginManagement>
> > +               <plugins>
> > +                       <plugin>
> > +                               <artifactId>maven-assembly-
> plugin</artifactId>
> > +                               <configuration>
> > +                                       <descriptor>src/main/assemble/
> bin.xml</descriptor>
> > +                                       <finalName>blur-indexer-${
> project.version}</finalName>
> > +                               </configuration>
> > +                               <executions>
> > +                                       <execution>
> > +                                               <phase>package</phase>
> > +                                               <goals>
> > +
>  <goal>single</goal>
> > +                                               </goals>
> > +                                       </execution>
> > +                               </executions>
> > +                       </plugin>
> > +               </plugins>
> > +       </build>
> > +</project>
> >
> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> ea50630a/blur-indexer/src/main/assemble/bin.xml
> > ----------------------------------------------------------------------
> > diff --git a/blur-indexer/src/main/assemble/bin.xml
> b/blur-indexer/src/main/assemble/bin.xml
> > new file mode 100644
> > index 0000000..5fddd56
> > --- /dev/null
> > +++ b/blur-indexer/src/main/assemble/bin.xml
> > @@ -0,0 +1,45 @@
> > +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-
> plugin/assembly/1.1.2"
> > +    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> > +           xsi:schemaLocation="http://maven.apache.org/plugins/
> maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/
> assembly-1.1.2.xsd">
> > +  <formats>
> > +    <format>tar.gz</format>
> > +  </formats>
> > +  <includeBaseDirectory>false</includeBaseDirectory>
> > +
> > +  <dependencySets>
> > +    <dependencySet>
> > +      <useProjectArtifact>true</useProjectArtifact>
> > +      <outputDirectory>blur-indexer-${project.version}/lib</
> outputDirectory>
> > +      <unpack>false</unpack>
> > +      <includes>
> > +        <include>org.apache.blur:blur-indexer</include>
> > +        <include>org.apache.blur:*</include>
> > +        <include>org.apache.zookeeper:zookeeper</include>
> > +        <include>org.slf4j:slf4j-api</include>
> > +        <include>org.slf4j:slf4j-log4j12</include>
> > +        <include>org.json:json</include>
> > +        <include>log4j:log4j</include>
> > +        <include>com.yammer.metrics:*</include>
> > +        <include>com.google.guava:guava</include>
> > +        <include>org.apache.httpcomponents:*</include>
> > +        <include>org.apache.lucene:*</include>
> > +        <include>com.spatial4j:spatial4j</include>
> > +        <include>commons-cli:commons-cli</include>
> > +        <include>org.eclipse.jetty:*</include>
> > +        <include>com.googlecode.concurrentlinkedhashmap:
> concurrentlinkedhashmap-lru</include>
> > +        <include>jline:jline</include>
> > +        <include>com.fasterxml.jackson.core:*</include>
> > +      </includes>
> > +    </dependencySet>
> > +  </dependencySets>
> > +
> > +  <fileSets>
> > +    <fileSet>
> > +      <directory>${project.build.scriptSourceDirectory}</directory>
> > +      <outputDirectory>blur-indexer-${project.version}</
> outputDirectory>
> > +      <excludes>
> > +        <exclude>**/.empty</exclude>
> > +      </excludes>
> > +    </fileSet>
> > +  </fileSets>
> > +</assembly>
> >
> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/
> BlurIndexCounter.java
> > ----------------------------------------------------------------------
> > diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> update/BlurIndexCounter.java
> > new file mode 100644
> > index 0000000..a9caabb
> > --- /dev/null
> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> update/BlurIndexCounter.java
> > @@ -0,0 +1,17 @@
> > +package org.apache.blur.mapreduce.lib.update;
> > +
> > +public enum BlurIndexCounter {
> > +
> > +  NEW_RECORDS,
> > +  ROW_IDS_FROM_INDEX,
> > +  ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
> > +  ROW_IDS_FROM_NEW_DATA,
> > +
> > +  INPUT_FORMAT_MAPPER,
> > +  INPUT_FORMAT_EXISTING_RECORDS,
> > +
> > +  LOOKUP_MAPPER,
> > +  LOOKUP_MAPPER_EXISTING_RECORDS,
> > +  LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
> > +
> > +}
> >
> > http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/
> ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/
> ClusterDriver.java
> > ----------------------------------------------------------------------
> > diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
> b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> update/ClusterDriver.java
> > new file mode 100644
> > index 0000000..d44adf1
> > --- /dev/null
> > +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/
> update/ClusterDriver.java
> > @@ -0,0 +1,362 @@
> > +package org.apache.blur.mapreduce.lib.update;
> > +
> > +import java.io.ByteArrayInputStream;
> > +import java.io.ByteArrayOutputStream;
> > +import java.io.IOException;
> > +import java.io.InputStream;
> > +import java.net.URL;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Map.Entry;
> > +import java.util.Set;
> > +import java.util.UUID;
> > +import java.util.concurrent.Callable;
> > +import java.util.concurrent.ExecutionException;
> > +import java.util.concurrent.ExecutorService;
> > +import java.util.concurrent.Executors;
> > +import java.util.concurrent.Future;
> > +import java.util.concurrent.TimeUnit;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +
> > +import org.apache.blur.log.Log;
> > +import org.apache.blur.log.LogFactory;
> > +import org.apache.blur.mapreduce.lib.BlurInputFormat;
> > +import org.apache.blur.thirdparty.thrift_0_9_0.TException;
> > +import org.apache.blur.thrift.BlurClient;
> > +import org.apache.blur.thrift.generated.Blur.Iface;
> > +import org.apache.blur.thrift.generated.BlurException;
> > +import org.apache.blur.thrift.generated.TableDescriptor;
> > +import org.apache.blur.thrift.generated.TableStats;
> > +import org.apache.blur.utils.BlurConstants;
> > +import org.apache.commons.io.IOUtils;
> > +import org.apache.hadoop.conf.Configuration;
> > +import org.apache.hadoop.conf.Configured;
> > +import org.apache.hadoop.fs.FSDataInputStream;
> > +import org.apache.hadoop.fs.FileStatus;
> > +import org.apache.hadoop.fs.FileSystem;
> > +import org.apache.hadoop.fs.Path;
> > +import org.apache.hadoop.fs.permission.FsAction;
> > +import org.apache.hadoop.mapreduce.Cluster;
> > +import org.apache.hadoop.mapreduce.Job;
> > +import org.apache.hadoop.mapreduce.JobID;
> > +import org.apache.hadoop.mapreduce.JobStatus;
> > +import org.apache.hadoop.util.Tool;
> > +import org.apache.hadoop.util.ToolRunner;
> > +import org.apache.hadoop.yarn.exceptions.YarnException;
> > +import org.apache.log4j.LogManager;
> > +import org.apache.log4j.xml.DOMConfigurator;
> > +
> > +public class ClusterDriver extends Configured implements Tool {
> > +
> > +  private static final String BLUR_ENV = "blur.env";
> > +  private static final Log LOG = LogFactory.getLog(
> ClusterDriver.class);
> > +  private static final String _SEP = "_";
> > +  private static final String IMPORT = "import";
> > +
> > +  public static void main(String[] args) throws Exception {
> > +    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
> > +    System.out.println("Log file path [" + logFilePath + "]");
> > +    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
> > +    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
> > +    if (url != null) {
> > +      LOG.info("Reseting log4j config from classpath resource [{0}]",
> url);
> > +      LogManager.resetConfiguration();
> > +      DOMConfigurator.configure(url);
> > +    }
> > +    int res = ToolRunner.run(new Configuration(), new ClusterDriver(),
> args);
>
> Not sure what this thing does yet but it seems we should validate
> those args since their accessed blindly in run...
>
> --tim
>

Re: [04/13] git commit: Third round of updates.

Posted by Tim Williams <wi...@gmail.com>.
NoNot sure what this is yet but itPlease be more considerate with your
commit messages... it's a lot of code to look through without having
any context besides "N round of updates."


On Mon, Aug 29, 2016 at 9:57 PM,  <am...@apache.org> wrote:
> Third round of updates.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ea50630a
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ea50630a
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ea50630a
>
> Branch: refs/heads/master
> Commit: ea50630a38d67675a61a916b144f3c0ce85d7f7a
> Parents: 0141656
> Author: Aaron McCurry <am...@gmail.com>
> Authored: Sat May 7 13:11:54 2016 -0400
> Committer: Aaron McCurry <am...@gmail.com>
> Committed: Sat May 7 13:11:54 2016 -0400
>
> ----------------------------------------------------------------------
>  blur-indexer/pom.xml                            |  58 +++
>  blur-indexer/src/main/assemble/bin.xml          |  45 ++
>  .../mapreduce/lib/update/BlurIndexCounter.java  |  17 +
>  .../mapreduce/lib/update/ClusterDriver.java     | 362 ++++++++++++++
>  .../blur/mapreduce/lib/update/FasterDriver.java | 486 +++++++++++++++++++
>  .../update/HdfsConfigurationNamespaceMerge.java | 115 +++++
>  .../lib/update/InputSplitPruneUtil.java         | 133 +++++
>  .../lib/update/LookupBuilderMapper.java         |  18 +
>  .../lib/update/LookupBuilderReducer.java        | 165 +++++++
>  .../lib/update/MapperForExistingDataMod.java    |  46 ++
>  .../MapperForExistingDataWithIndexLookup.java   | 228 +++++++++
>  .../lib/update/MapperForNewDataMod.java         |  82 ++++
>  .../lib/update/MergeSortRowIdMatcher.java       | 372 ++++++++++++++
>  .../lib/update/PrunedBlurInputFormat.java       |  57 +++
>  .../update/PrunedSequenceFileInputFormat.java   |  59 +++
>  .../src/main/resources/blur-site.properties     |   1 +
>  .../src/main/resources/program-log4j.xml        |  29 ++
>  blur-indexer/src/main/resources/test-log4j.xml  |  46 ++
>  18 files changed, 2319 insertions(+)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/pom.xml
> ----------------------------------------------------------------------
> diff --git a/blur-indexer/pom.xml b/blur-indexer/pom.xml
> new file mode 100644
> index 0000000..c7c1753
> --- /dev/null
> +++ b/blur-indexer/pom.xml
> @@ -0,0 +1,58 @@
> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> +       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
> +       <modelVersion>4.0.0</modelVersion>
> +       <groupId>org.apache.blur</groupId>
> +       <artifactId>blur-indexer</artifactId>
> +       <version>0.2.8</version>
> +       <name>blur-indexer</name>
> +       <packaging>jar</packaging>
> +
> +       <properties>
> +               <blur.version>0.3.0.incubating.2.5.0.cdh5.3.3-SNAPSHOT</blur.version>
> +       </properties>
> +       <dependencies>
> +               <dependency>
> +                       <groupId>org.apache.blur</groupId>
> +                       <artifactId>blur-mapred</artifactId>
> +                       <version>${blur.version}</version>
> +               </dependency>
> +               <dependency>
> +                       <groupId>junit</groupId>
> +                       <artifactId>junit</artifactId>
> +                       <version>4.9</version>
> +                       <scope>test</scope>
> +               </dependency>
> +       </dependencies>
> +
> +       <build>
> +               <pluginManagement>
> +                       <plugins>
> +                               <plugin>
> +                                       <groupId>org.apache.maven.plugins</groupId>
> +                                       <artifactId>maven-compiler-plugin</artifactId>
> +                                       <configuration>
> +                                               <source>1.8</source>
> +                                               <target>1.8</target>
> +                                       </configuration>
> +                               </plugin>
> +                       </plugins>
> +               </pluginManagement>
> +               <plugins>
> +                       <plugin>
> +                               <artifactId>maven-assembly-plugin</artifactId>
> +                               <configuration>
> +                                       <descriptor>src/main/assemble/bin.xml</descriptor>
> +                                       <finalName>blur-indexer-${project.version}</finalName>
> +                               </configuration>
> +                               <executions>
> +                                       <execution>
> +                                               <phase>package</phase>
> +                                               <goals>
> +                                                       <goal>single</goal>
> +                                               </goals>
> +                                       </execution>
> +                               </executions>
> +                       </plugin>
> +               </plugins>
> +       </build>
> +</project>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/assemble/bin.xml
> ----------------------------------------------------------------------
> diff --git a/blur-indexer/src/main/assemble/bin.xml b/blur-indexer/src/main/assemble/bin.xml
> new file mode 100644
> index 0000000..5fddd56
> --- /dev/null
> +++ b/blur-indexer/src/main/assemble/bin.xml
> @@ -0,0 +1,45 @@
> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
> +    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> +           xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
> +  <formats>
> +    <format>tar.gz</format>
> +  </formats>
> +  <includeBaseDirectory>false</includeBaseDirectory>
> +
> +  <dependencySets>
> +    <dependencySet>
> +      <useProjectArtifact>true</useProjectArtifact>
> +      <outputDirectory>blur-indexer-${project.version}/lib</outputDirectory>
> +      <unpack>false</unpack>
> +      <includes>
> +        <include>org.apache.blur:blur-indexer</include>
> +        <include>org.apache.blur:*</include>
> +        <include>org.apache.zookeeper:zookeeper</include>
> +        <include>org.slf4j:slf4j-api</include>
> +        <include>org.slf4j:slf4j-log4j12</include>
> +        <include>org.json:json</include>
> +        <include>log4j:log4j</include>
> +        <include>com.yammer.metrics:*</include>
> +        <include>com.google.guava:guava</include>
> +        <include>org.apache.httpcomponents:*</include>
> +        <include>org.apache.lucene:*</include>
> +        <include>com.spatial4j:spatial4j</include>
> +        <include>commons-cli:commons-cli</include>
> +        <include>org.eclipse.jetty:*</include>
> +        <include>com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru</include>
> +        <include>jline:jline</include>
> +        <include>com.fasterxml.jackson.core:*</include>
> +      </includes>
> +    </dependencySet>
> +  </dependencySets>
> +
> +  <fileSets>
> +    <fileSet>
> +      <directory>${project.build.scriptSourceDirectory}</directory>
> +      <outputDirectory>blur-indexer-${project.version}</outputDirectory>
> +      <excludes>
> +        <exclude>**/.empty</exclude>
> +      </excludes>
> +    </fileSet>
> +  </fileSets>
> +</assembly>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
> ----------------------------------------------------------------------
> diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
> new file mode 100644
> index 0000000..a9caabb
> --- /dev/null
> +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/BlurIndexCounter.java
> @@ -0,0 +1,17 @@
> +package org.apache.blur.mapreduce.lib.update;
> +
> +public enum BlurIndexCounter {
> +
> +  NEW_RECORDS,
> +  ROW_IDS_FROM_INDEX,
> +  ROW_IDS_TO_UPDATE_FROM_NEW_DATA,
> +  ROW_IDS_FROM_NEW_DATA,
> +
> +  INPUT_FORMAT_MAPPER,
> +  INPUT_FORMAT_EXISTING_RECORDS,
> +
> +  LOOKUP_MAPPER,
> +  LOOKUP_MAPPER_EXISTING_RECORDS,
> +  LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ea50630a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
> ----------------------------------------------------------------------
> diff --git a/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
> new file mode 100644
> index 0000000..d44adf1
> --- /dev/null
> +++ b/blur-indexer/src/main/java/org/apache/blur/mapreduce/lib/update/ClusterDriver.java
> @@ -0,0 +1,362 @@
> +package org.apache.blur.mapreduce.lib.update;
> +
> +import java.io.ByteArrayInputStream;
> +import java.io.ByteArrayOutputStream;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.net.URL;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +import java.util.Set;
> +import java.util.UUID;
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +import java.util.concurrent.Future;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.atomic.AtomicBoolean;
> +
> +import org.apache.blur.log.Log;
> +import org.apache.blur.log.LogFactory;
> +import org.apache.blur.mapreduce.lib.BlurInputFormat;
> +import org.apache.blur.thirdparty.thrift_0_9_0.TException;
> +import org.apache.blur.thrift.BlurClient;
> +import org.apache.blur.thrift.generated.Blur.Iface;
> +import org.apache.blur.thrift.generated.BlurException;
> +import org.apache.blur.thrift.generated.TableDescriptor;
> +import org.apache.blur.thrift.generated.TableStats;
> +import org.apache.blur.utils.BlurConstants;
> +import org.apache.commons.io.IOUtils;
> +import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.conf.Configured;
> +import org.apache.hadoop.fs.FSDataInputStream;
> +import org.apache.hadoop.fs.FileStatus;
> +import org.apache.hadoop.fs.FileSystem;
> +import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.fs.permission.FsAction;
> +import org.apache.hadoop.mapreduce.Cluster;
> +import org.apache.hadoop.mapreduce.Job;
> +import org.apache.hadoop.mapreduce.JobID;
> +import org.apache.hadoop.mapreduce.JobStatus;
> +import org.apache.hadoop.util.Tool;
> +import org.apache.hadoop.util.ToolRunner;
> +import org.apache.hadoop.yarn.exceptions.YarnException;
> +import org.apache.log4j.LogManager;
> +import org.apache.log4j.xml.DOMConfigurator;
> +
> +public class ClusterDriver extends Configured implements Tool {
> +
> +  private static final String BLUR_ENV = "blur.env";
> +  private static final Log LOG = LogFactory.getLog(ClusterDriver.class);
> +  private static final String _SEP = "_";
> +  private static final String IMPORT = "import";
> +
> +  public static void main(String[] args) throws Exception {
> +    String logFilePath = System.getenv("BLUR_INDEXER_LOG_FILE");
> +    System.out.println("Log file path [" + logFilePath + "]");
> +    System.setProperty("BLUR_INDEXER_LOG_FILE", logFilePath);
> +    URL url = ClusterDriver.class.getResource("/program-log4j.xml");
> +    if (url != null) {
> +      LOG.info("Reseting log4j config from classpath resource [{0}]", url);
> +      LogManager.resetConfiguration();
> +      DOMConfigurator.configure(url);
> +    }
> +    int res = ToolRunner.run(new Configuration(), new ClusterDriver(), args);

Not sure what this thing does yet but it seems we should validate
those args since their accessed blindly in run...

--tim