You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/03/27 06:05:25 UTC
[31/39] lucene-solr:feature/autoscaling: SOLR-9221: Remove Solr
contribs: map-reduce, morphlines-core and morphlines-cell
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCounters.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCounters.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCounters.java
deleted file mode 100644
index 88e9acb..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrCounters.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-public enum SolrCounters {
-
- DOCUMENTS_WRITTEN (getClassName(SolrReducer.class)
- + ": Number of documents processed"),
-
- BATCHES_WRITTEN (getClassName(SolrReducer.class)
- + ": Number of document batches processed"),
-
- BATCH_WRITE_TIME (getClassName(SolrReducer.class)
- + ": Time spent by reducers writing batches [ms]"),
-
- PHYSICAL_REDUCER_MERGE_TIME (getClassName(SolrReducer.class)
- + ": Time spent by reducers on physical merges [ms]"),
-
- LOGICAL_TREE_MERGE_TIME (getClassName(TreeMergeMapper.class)
- + ": Time spent on logical tree merges [ms]"),
-
- PHYSICAL_TREE_MERGE_TIME (getClassName(TreeMergeMapper.class)
- + ": Time spent on physical tree merges [ms]");
-
- private final String label;
-
- private SolrCounters(String label) {
- this.label = label;
- }
-
- public String toString() {
- return label;
- }
-
- private static String getClassName(Class clazz) {
- return Utils.getShortClassName(clazz);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrInputDocumentWritable.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrInputDocumentWritable.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrInputDocumentWritable.java
deleted file mode 100644
index e043f7a..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrInputDocumentWritable.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.FastOutputStream;
-import org.apache.solr.common.util.JavaBinCodec;
-
-public class SolrInputDocumentWritable implements Writable {
- private SolrInputDocument sid;
-
- public SolrInputDocumentWritable() {
- }
-
- public SolrInputDocumentWritable(SolrInputDocument sid) {
- this.sid = sid;
- }
-
- public SolrInputDocument getSolrInputDocument() {
- return sid;
- }
-
- @Override
- public String toString() {
- return sid.toString();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- JavaBinCodec codec = new JavaBinCodec();
- FastOutputStream daos = FastOutputStream.wrap(DataOutputOutputStream.constructOutputStream(out));
- codec.init(daos);
- try {
- codec.writeVal(sid);
- } finally {
- daos.flushBuffer();
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- JavaBinCodec codec = new JavaBinCodec();
- UnbufferedDataInputInputStream dis = new UnbufferedDataInputInputStream(in);
- sid = (SolrInputDocument)codec.readVal(dis);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrMapper.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrMapper.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrMapper.java
deleted file mode 100644
index 6c73f1b..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class SolrMapper<KEYIN, VALUEIN> extends Mapper<KEYIN, VALUEIN, Text, SolrInputDocumentWritable> {
-
- private Path solrHomeDir;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- Utils.getLogConfigFile(context.getConfiguration());
- super.setup(context);
- solrHomeDir = SolrRecordWriter.findSolrConfig(context.getConfiguration());
- }
-
- protected Path getSolrHomeDir() {
- return solrHomeDir;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrOutputFormat.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrOutputFormat.java
deleted file mode 100644
index b52939e..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrOutputFormat.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Locale;
-import java.util.Set;
-import java.util.UUID;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SolrOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- /**
- * The parameter used to pass the solr config zip file information. This will
- * be the hdfs path to the configuration zip file
- */
- public static final String SETUP_OK = "solr.output.format.setup";
-
- /** The key used to pass the zip file name through the configuration. */
- public static final String ZIP_NAME = "solr.zip.name";
-
- /**
- * The base name of the zip file containing the configuration information.
- * This file is passed via the distributed cache using a unique name, obtained
- * via {@link #getZipName(Configuration jobConf)}.
- */
- public static final String ZIP_FILE_BASE_NAME = "solr.zip";
-
- /**
- * The key used to pass the boolean configuration parameter that instructs for
- * regular or zip file output
- */
- public static final String OUTPUT_ZIP_FILE = "solr.output.zip.format";
-
- static int defaultSolrWriterThreadCount = 0;
-
- public static final String SOLR_WRITER_THREAD_COUNT = "solr.record.writer.num.threads";
-
- static int defaultSolrWriterQueueSize = 1;
-
- public static final String SOLR_WRITER_QUEUE_SIZE = "solr.record.writer.max.queues.size";
-
- static int defaultSolrBatchSize = 20;
-
- public static final String SOLR_RECORD_WRITER_BATCH_SIZE = "solr.record.writer.batch.size";
-
- public static final String SOLR_RECORD_WRITER_MAX_SEGMENTS = "solr.record.writer.maxSegments";
-
- public static String getSetupOk() {
- return SETUP_OK;
- }
-
- /** Get the number of threads used for index writing */
- public static void setSolrWriterThreadCount(int count, Configuration conf) {
- conf.setInt(SOLR_WRITER_THREAD_COUNT, count);
- }
-
- /** Set the number of threads used for index writing */
- public static int getSolrWriterThreadCount(Configuration conf) {
- return conf.getInt(SOLR_WRITER_THREAD_COUNT, defaultSolrWriterThreadCount);
- }
-
- /**
- * Set the maximum size of the the queue for documents to be written to the
- * index.
- */
- public static void setSolrWriterQueueSize(int count, Configuration conf) {
- conf.setInt(SOLR_WRITER_QUEUE_SIZE, count);
- }
-
- /** Return the maximum size for the number of documents pending index writing. */
- public static int getSolrWriterQueueSize(Configuration conf) {
- return conf.getInt(SOLR_WRITER_QUEUE_SIZE, defaultSolrWriterQueueSize);
- }
-
- /**
- * Return the file name portion of the configuration zip file, from the
- * configuration.
- */
- public static String getZipName(Configuration conf) {
- return conf.get(ZIP_NAME, ZIP_FILE_BASE_NAME);
- }
-
- /**
- * configure the job to output zip files of the output index, or full
- * directory trees. Zip files are about 1/5th the size of the raw index, and
- * much faster to write, but take more cpu to create.
- *
- * @param output true if should output zip files
- * @param conf to use
- */
- public static void setOutputZipFormat(boolean output, Configuration conf) {
- conf.setBoolean(OUTPUT_ZIP_FILE, output);
- }
-
- /**
- * return true if the output should be a zip file of the index, rather than
- * the raw index
- *
- * @param conf to use
- * @return true if output zip files is on
- */
- public static boolean isOutputZipFormat(Configuration conf) {
- return conf.getBoolean(OUTPUT_ZIP_FILE, false);
- }
-
- public static String getOutputName(JobContext job) {
- return FileOutputFormat.getOutputName(job);
- }
-
- @Override
- public void checkOutputSpecs(JobContext job) throws IOException {
- super.checkOutputSpecs(job);
- if (job.getConfiguration().get(SETUP_OK) == null) {
- throw new IOException("Solr home cache not set up!");
- }
- }
-
-
- @Override
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- Utils.getLogConfigFile(context.getConfiguration());
- Path workDir = getDefaultWorkFile(context, "");
- int batchSize = getBatchSize(context.getConfiguration());
- return new SolrRecordWriter<>(context, workDir, batchSize);
- }
-
- public static void setupSolrHomeCache(File solrHomeDir, Job job) throws IOException{
- File solrHomeZip = createSolrHomeZip(solrHomeDir);
- addSolrConfToDistributedCache(job, solrHomeZip);
- }
-
- public static File createSolrHomeZip(File solrHomeDir) throws IOException {
- return createSolrHomeZip(solrHomeDir, false);
- }
-
- private static File createSolrHomeZip(File solrHomeDir, boolean safeToModify) throws IOException {
- if (solrHomeDir == null || !(solrHomeDir.exists() && solrHomeDir.isDirectory())) {
- throw new IOException("Invalid solr home: " + solrHomeDir);
- }
- File solrHomeZip = File.createTempFile("solr", ".zip");
- createZip(solrHomeDir, solrHomeZip);
- return solrHomeZip;
- }
-
- public static void addSolrConfToDistributedCache(Job job, File solrHomeZip)
- throws IOException {
- // Make a reasonably unique name for the zip file in the distributed cache
- // to avoid collisions if multiple jobs are running.
- String hdfsZipName = UUID.randomUUID().toString() + '.'
- + ZIP_FILE_BASE_NAME;
- Configuration jobConf = job.getConfiguration();
- jobConf.set(ZIP_NAME, hdfsZipName);
-
- Path zipPath = new Path("/tmp", getZipName(jobConf));
- FileSystem fs = FileSystem.get(jobConf);
- fs.copyFromLocalFile(new Path(solrHomeZip.toString()), zipPath);
- final URI baseZipUrl = fs.getUri().resolve(
- zipPath.toString() + '#' + getZipName(jobConf));
-
- DistributedCache.addCacheArchive(baseZipUrl, jobConf);
- LOG.debug("Set Solr distributed cache: {}", Arrays.asList(job.getCacheArchives()));
- LOG.debug("Set zipPath: {}", zipPath);
- // Actually send the path for the configuration zip file
- jobConf.set(SETUP_OK, zipPath.toString());
- }
-
- private static void createZip(File dir, File out) throws IOException {
- HashSet<File> files = new HashSet<>();
- // take only conf/ and lib/
- for (String allowedDirectory : SolrRecordWriter
- .getAllowedConfigDirectories()) {
- File configDir = new File(dir, allowedDirectory);
- boolean configDirExists;
- /** If the directory does not exist, and is required, bail out */
- if (!(configDirExists = configDir.exists())
- && SolrRecordWriter.isRequiredConfigDirectory(allowedDirectory)) {
- throw new IOException(String.format(Locale.ENGLISH,
- "required configuration directory %s is not present in %s",
- allowedDirectory, dir));
- }
- if (!configDirExists) {
- continue;
- }
- listFiles(configDir, files); // Store the files in the existing, allowed
- // directory configDir, in the list of files
- // to store in the zip file
- }
-
- Files.deleteIfExists(out.toPath());
- int subst = dir.toString().length();
- ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(out));
- byte[] buf = new byte[1024];
- for (File f : files) {
- ZipEntry ze = new ZipEntry(f.toString().substring(subst));
- zos.putNextEntry(ze);
- InputStream is = new FileInputStream(f);
- int cnt;
- while ((cnt = is.read(buf)) >= 0) {
- zos.write(buf, 0, cnt);
- }
- is.close();
- zos.flush();
- zos.closeEntry();
- }
-
- ZipEntry ze = new ZipEntry("solr.xml");
- zos.putNextEntry(ze);
- zos.write("<solr></solr>".getBytes("UTF-8"));
- zos.flush();
- zos.closeEntry();
- zos.close();
- }
-
- private static void listFiles(File dir, Set<File> files) throws IOException {
- File[] list = dir.listFiles();
-
- if (list == null && dir.isFile()) {
- files.add(dir);
- return;
- }
-
- for (File f : list) {
- if (f.isFile()) {
- files.add(f);
- } else {
- listFiles(f, files);
- }
- }
- }
-
- public static int getBatchSize(Configuration jobConf) {
- // TODO Auto-generated method stub
- return jobConf.getInt(SolrOutputFormat.SOLR_RECORD_WRITER_BATCH_SIZE,
- defaultSolrBatchSize);
- }
-
- public static void setBatchSize(int count, Configuration jobConf) {
- jobConf.setInt(SOLR_RECORD_WRITER_BATCH_SIZE, count);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrRecordWriter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrRecordWriter.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrRecordWriter.java
deleted file mode 100644
index f528c54..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrRecordWriter.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.DirectoryFactory;
-import org.apache.solr.core.HdfsDirectoryFactory;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public final static List<String> allowedConfigDirectories = new ArrayList<>(
- Arrays.asList(new String[] { "conf", "lib", "solr.xml", "core1" }));
-
- public final static Set<String> requiredConfigDirectories = new HashSet<>();
-
- static {
- requiredConfigDirectories.add("conf");
- }
-
- /**
- * Return the list of directories names that may be included in the
- * configuration data passed to the tasks.
- *
- * @return an UnmodifiableList of directory names
- */
- public static List<String> getAllowedConfigDirectories() {
- return Collections.unmodifiableList(allowedConfigDirectories);
- }
-
- /**
- * check if the passed in directory is required to be present in the
- * configuration data set.
- *
- * @param directory The directory to check
- * @return true if the directory is required.
- */
- public static boolean isRequiredConfigDirectory(final String directory) {
- return requiredConfigDirectories.contains(directory);
- }
-
- /** The path that the final index will be written to */
-
- /** The location in a local temporary directory that the index is built in. */
-
-// /**
-// * If true, create a zip file of the completed index in the final storage
-// * location A .zip will be appended to the final output name if it is not
-// * already present.
-// */
-// private boolean outputZipFile = false;
-
- private final HeartBeater heartBeater;
- private final BatchWriter batchWriter;
- private final List<SolrInputDocument> batch;
- private final int batchSize;
- private long numDocsWritten = 0;
- private long nextLogTime = System.nanoTime();
-
- private static HashMap<TaskID, Reducer<?,?,?,?>.Context> contextMap = new HashMap<>();
-
- public SolrRecordWriter(TaskAttemptContext context, Path outputShardDir, int batchSize) {
- this.batchSize = batchSize;
- this.batch = new ArrayList<>(batchSize);
- Configuration conf = context.getConfiguration();
-
- // setLogLevel("org.apache.solr.core", "WARN");
- // setLogLevel("org.apache.solr.update", "WARN");
-
- heartBeater = new HeartBeater(context);
- try {
- heartBeater.needHeartBeat();
-
- Path solrHomeDir = SolrRecordWriter.findSolrConfig(conf);
- FileSystem fs = outputShardDir.getFileSystem(conf);
- EmbeddedSolrServer solr = createEmbeddedSolrServer(solrHomeDir, fs, outputShardDir);
- batchWriter = new BatchWriter(solr, batchSize,
- context.getTaskAttemptID().getTaskID(),
- SolrOutputFormat.getSolrWriterThreadCount(conf),
- SolrOutputFormat.getSolrWriterQueueSize(conf));
-
- } catch (Exception e) {
- throw new IllegalStateException(String.format(Locale.ENGLISH,
- "Failed to initialize record writer for %s, %s", context.getJobName(), conf
- .get("mapred.task.id")), e);
- } finally {
- heartBeater.cancelHeartBeat();
- }
- }
-
- public static EmbeddedSolrServer createEmbeddedSolrServer(Path solrHomeDir, FileSystem fs, Path outputShardDir)
- throws IOException {
-
- LOG.info("Creating embedded Solr server with solrHomeDir: " + solrHomeDir + ", fs: " + fs + ", outputShardDir: " + outputShardDir);
-
- Path solrDataDir = new Path(outputShardDir, "data");
-
- String dataDirStr = solrDataDir.toUri().toString();
-
- SolrResourceLoader loader = new SolrResourceLoader(Paths.get(solrHomeDir.toString()), null, null);
-
- LOG.info(String
- .format(Locale.ENGLISH,
- "Constructed instance information solr.home %s (%s), instance dir %s, conf dir %s, writing index to solr.data.dir %s, with permdir %s",
- solrHomeDir, solrHomeDir.toUri(), loader.getInstancePath(),
- loader.getConfigDir(), dataDirStr, outputShardDir));
-
- // TODO: This is fragile and should be well documented
- System.setProperty("solr.directoryFactory", HdfsDirectoryFactory.class.getName());
- System.setProperty("solr.lock.type", DirectoryFactory.LOCK_TYPE_HDFS);
- System.setProperty("solr.hdfs.nrtcachingdirectory", "false");
- System.setProperty("solr.hdfs.blockcache.enabled", "false");
- System.setProperty("solr.autoCommit.maxTime", "600000");
- System.setProperty("solr.autoSoftCommit.maxTime", "-1");
-
- CoreContainer container = new CoreContainer(loader);
- container.load();
- SolrCore core = container.create("", ImmutableMap.of(CoreDescriptor.CORE_DATADIR, dataDirStr));
-
- if (!(core.getDirectoryFactory() instanceof HdfsDirectoryFactory)) {
- throw new UnsupportedOperationException(
- "Invalid configuration. Currently, the only DirectoryFactory supported is "
- + HdfsDirectoryFactory.class.getSimpleName());
- }
-
- EmbeddedSolrServer solr = new EmbeddedSolrServer(container, "");
- return solr;
- }
-
- public static void incrementCounter(TaskID taskId, String groupName, String counterName, long incr) {
- Reducer<?,?,?,?>.Context context = contextMap.get(taskId);
- if (context != null) {
- context.getCounter(groupName, counterName).increment(incr);
- }
- }
-
- public static void incrementCounter(TaskID taskId, Enum<?> counterName, long incr) {
- Reducer<?,?,?,?>.Context context = contextMap.get(taskId);
- if (context != null) {
- context.getCounter(counterName).increment(incr);
- }
- }
-
- public static void addReducerContext(Reducer<?,?,?,?>.Context context) {
- TaskID taskID = context.getTaskAttemptID().getTaskID();
- contextMap.put(taskID, context);
- }
-
- public static Path findSolrConfig(Configuration conf) throws IOException {
- // FIXME when mrunit supports the new cache apis
- //URI[] localArchives = context.getCacheArchives();
- Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
- for (Path unpackedDir : localArchives) {
- if (unpackedDir.getName().equals(SolrOutputFormat.getZipName(conf))) {
- LOG.info("Using this unpacked directory as solr home: {}", unpackedDir);
- return unpackedDir;
- }
- }
- throw new IOException(String.format(Locale.ENGLISH,
- "No local cache archives, where is %s:%s", SolrOutputFormat
- .getSetupOk(), SolrOutputFormat.getZipName(conf)));
- }
-
- /**
- * Write a record. This method accumulates records in to a batch, and when
- * {@link #batchSize} items are present flushes it to the indexer. The writes
- * can take a substantial amount of time, depending on {@link #batchSize}. If
- * there is heavy disk contention the writes may take more than the 600 second
- * default timeout.
- */
- @Override
- public void write(K key, V value) throws IOException {
- heartBeater.needHeartBeat();
- try {
- try {
- SolrInputDocumentWritable sidw = (SolrInputDocumentWritable) value;
- batch.add(sidw.getSolrInputDocument());
- if (batch.size() >= batchSize) {
- batchWriter.queueBatch(batch);
- numDocsWritten += batch.size();
- if (System.nanoTime() >= nextLogTime) {
- LOG.info("docsWritten: {}", numDocsWritten);
- nextLogTime += TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
- }
- batch.clear();
- }
- } catch (SolrServerException e) {
- throw new IOException(e);
- }
- } finally {
- heartBeater.cancelHeartBeat();
- }
-
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- if (context != null) {
- heartBeater.setProgress(context);
- }
- try {
- heartBeater.needHeartBeat();
- if (batch.size() > 0) {
- batchWriter.queueBatch(batch);
- numDocsWritten += batch.size();
- batch.clear();
- }
- LOG.info("docsWritten: {}", numDocsWritten);
- batchWriter.close(context);
-// if (outputZipFile) {
-// context.setStatus("Writing Zip");
-// packZipFile(); // Written to the perm location
-// } else {
-// context.setStatus("Copying Index");
-// fs.completeLocalOutput(perm, temp); // copy to dfs
-// }
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException) e;
- }
- throw new IOException(e);
- } finally {
- heartBeater.cancelHeartBeat();
- heartBeater.close();
-// File tempFile = new File(temp.toString());
-// if (tempFile.exists()) {
-// FileUtils.forceDelete(new File(temp.toString()));
-// }
- }
-
- context.setStatus("Done");
- }
-
-// private void packZipFile() throws IOException {
-// FSDataOutputStream out = null;
-// ZipOutputStream zos = null;
-// int zipCount = 0;
-// LOG.info("Packing zip file for " + perm);
-// try {
-// out = fs.create(perm, false);
-// zos = new ZipOutputStream(out);
-//
-// String name = perm.getName().replaceAll(".zip$", "");
-// LOG.info("adding index directory" + temp);
-// zipCount = zipDirectory(conf, zos, name, temp.toString(), temp);
-// /**
-// for (String configDir : allowedConfigDirectories) {
-// if (!isRequiredConfigDirectory(configDir)) {
-// continue;
-// }
-// final Path confPath = new Path(solrHome, configDir);
-// LOG.info("adding configdirectory" + confPath);
-//
-// zipCount += zipDirectory(conf, zos, name, solrHome.toString(), confPath);
-// }
-// **/
-// } catch (Throwable ohFoo) {
-// LOG.error("packZipFile exception", ohFoo);
-// if (ohFoo instanceof RuntimeException) {
-// throw (RuntimeException) ohFoo;
-// }
-// if (ohFoo instanceof IOException) {
-// throw (IOException) ohFoo;
-// }
-// throw new IOException(ohFoo);
-//
-// } finally {
-// if (zos != null) {
-// if (zipCount == 0) { // If no entries were written, only close out, as
-// // the zip will throw an error
-// LOG.error("No entries written to zip file " + perm);
-// fs.delete(perm, false);
-// // out.close();
-// } else {
-// LOG.info(String.format("Wrote %d items to %s for %s", zipCount, perm,
-// temp));
-// zos.close();
-// }
-// }
-// }
-// }
-//
-// /**
-// * Write a file to a zip output stream, removing leading path name components
-// * from the actual file name when creating the zip file entry.
-// *
-// * The entry placed in the zip file is <code>baseName</code>/
-// * <code>relativePath</code>, where <code>relativePath</code> is constructed
-// * by removing a leading <code>root</code> from the path for
-// * <code>itemToZip</code>.
-// *
-// * If <code>itemToZip</code> is an empty directory, it is ignored. If
-// * <code>itemToZip</code> is a directory, the contents of the directory are
-// * added recursively.
-// *
-// * @param zos The zip output stream
-// * @param baseName The base name to use for the file name entry in the zip
-// * file
-// * @param root The path to remove from <code>itemToZip</code> to make a
-// * relative path name
-// * @param itemToZip The path to the file to be added to the zip file
-// * @return the number of entries added
-// * @throws IOException
-// */
-// static public int zipDirectory(final Configuration conf,
-// final ZipOutputStream zos, final String baseName, final String root,
-// final Path itemToZip) throws IOException {
-// LOG
-// .info(String
-// .format("zipDirectory: %s %s %s", baseName, root, itemToZip));
-// LocalFileSystem localFs = FileSystem.getLocal(conf);
-// int count = 0;
-//
-// final FileStatus itemStatus = localFs.getFileStatus(itemToZip);
-// if (itemStatus.isDirectory()) {
-// final FileStatus[] statai = localFs.listStatus(itemToZip);
-//
-// // Add a directory entry to the zip file
-// final String zipDirName = relativePathForZipEntry(itemToZip.toUri()
-// .getPath(), baseName, root);
-// final ZipEntry dirZipEntry = new ZipEntry(zipDirName
-// + Path.SEPARATOR_CHAR);
-// LOG.info(String.format("Adding directory %s to zip", zipDirName));
-// zos.putNextEntry(dirZipEntry);
-// zos.closeEntry();
-// count++;
-//
-// if (statai == null || statai.length == 0) {
-// LOG.info(String.format("Skipping empty directory %s", itemToZip));
-// return count;
-// }
-// for (FileStatus status : statai) {
-// count += zipDirectory(conf, zos, baseName, root, status.getPath());
-// }
-// LOG.info(String.format("Wrote %d entries for directory %s", count,
-// itemToZip));
-// return count;
-// }
-//
-// final String inZipPath = relativePathForZipEntry(itemToZip.toUri()
-// .getPath(), baseName, root);
-//
-// if (inZipPath.length() == 0) {
-// LOG.warn(String.format("Skipping empty zip file path for %s (%s %s)",
-// itemToZip, root, baseName));
-// return 0;
-// }
-//
-// // Take empty files in case the place holder is needed
-// FSDataInputStream in = null;
-// try {
-// in = localFs.open(itemToZip);
-// final ZipEntry ze = new ZipEntry(inZipPath);
-// ze.setTime(itemStatus.getModificationTime());
-// // Comments confuse looking at the zip file
-// // ze.setComment(itemToZip.toString());
-// zos.putNextEntry(ze);
-//
-// IOUtils.copyBytes(in, zos, conf, false);
-// zos.closeEntry();
-// LOG.info(String.format("Wrote %d entries for file %s", count, itemToZip));
-// return 1;
-// } finally {
-// in.close();
-// }
-//
-// }
-//
-// static String relativePathForZipEntry(final String rawPath,
-// final String baseName, final String root) {
-// String relativePath = rawPath.replaceFirst(Pattern.quote(root.toString()),
-// "");
-// LOG.info(String.format("RawPath %s, baseName %s, root %s, first %s",
-// rawPath, baseName, root, relativePath));
-//
-// if (relativePath.startsWith(Path.SEPARATOR)) {
-// relativePath = relativePath.substring(1);
-// }
-// LOG.info(String.format(
-// "RawPath %s, baseName %s, root %s, post leading slash %s", rawPath,
-// baseName, root, relativePath));
-// if (relativePath.isEmpty()) {
-// LOG.warn(String.format(
-// "No data after root (%s) removal from raw path %s", root, rawPath));
-// return baseName;
-// }
-// // Construct the path that will be written to the zip file, including
-// // removing any leading '/' characters
-// String inZipPath = baseName + Path.SEPARATOR_CHAR + relativePath;
-//
-// LOG.info(String.format("RawPath %s, baseName %s, root %s, inZip 1 %s",
-// rawPath, baseName, root, inZipPath));
-// if (inZipPath.startsWith(Path.SEPARATOR)) {
-// inZipPath = inZipPath.substring(1);
-// }
-// LOG.info(String.format("RawPath %s, baseName %s, root %s, inZip 2 %s",
-// rawPath, baseName, root, inZipPath));
-//
-// return inZipPath;
-//
-// }
-//
- /*
- static boolean setLogLevel(String packageName, String level) {
- Log logger = LogFactory.getLog(packageName);
- if (logger == null) {
- return false;
- }
- // look for: org.apache.commons.logging.impl.SLF4JLocationAwareLog
- LOG.warn("logger class:"+logger.getClass().getName());
- if (logger instanceof Log4JLogger) {
- process(((Log4JLogger) logger).getLogger(), level);
- return true;
- }
- if (logger instanceof Jdk14Logger) {
- process(((Jdk14Logger) logger).getLogger(), level);
- return true;
- }
- return false;
- }
-
- public static void process(org.apache.log4j.Logger log, String level) {
- if (level != null) {
- log.setLevel(org.apache.log4j.Level.toLevel(level));
- }
- }
-
- public static void process(java.util.logging.Logger log, String level) {
- if (level != null) {
- log.setLevel(java.util.logging.Level.parse(level));
- }
- }
- */
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
deleted file mode 100644
index 5e10c97..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/SolrReducer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.hadoop.dedup.NoChangeUpdateConflictResolver;
-import org.apache.solr.hadoop.dedup.RetainMostRecentUpdateConflictResolver;
-import org.apache.solr.hadoop.dedup.UpdateConflictResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.kitesdk.morphline.api.ExceptionHandler;
-import org.kitesdk.morphline.base.FaultTolerance;
-import com.google.common.base.Preconditions;
-
-/**
- * This class loads the mapper's SolrInputDocuments into one EmbeddedSolrServer
- * per reducer. Each such reducer and Solr server can be seen as a (micro)
- * shard. The Solr servers store their data in HDFS.
- *
- * More specifically, this class consumes a list of <docId, SolrInputDocument>
- * pairs, sorted by docId, and sends them to an embedded Solr server to generate
- * a Solr index shard from the documents.
- */
-public class SolrReducer extends Reducer<Text, SolrInputDocumentWritable, Text, SolrInputDocumentWritable> {
-
- private UpdateConflictResolver resolver;
- private HeartBeater heartBeater;
- private ExceptionHandler exceptionHandler;
-
- public static final String UPDATE_CONFLICT_RESOLVER = SolrReducer.class.getName() + ".updateConflictResolver";
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- verifyPartitionAssignment(context);
- SolrRecordWriter.addReducerContext(context);
- Class<? extends UpdateConflictResolver> resolverClass = context.getConfiguration().getClass(
- UPDATE_CONFLICT_RESOLVER, RetainMostRecentUpdateConflictResolver.class, UpdateConflictResolver.class);
-
- this.resolver = ReflectionUtils.newInstance(resolverClass, context.getConfiguration());
- /*
- * Note that ReflectionUtils.newInstance() above also implicitly calls
- * resolver.configure(context.getConfiguration()) if the resolver
- * implements org.apache.hadoop.conf.Configurable
- */
-
- this.exceptionHandler = new FaultTolerance(
- context.getConfiguration().getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false),
- context.getConfiguration().getBoolean(FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS, false),
- context.getConfiguration().get(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES, SolrServerException.class.getName()));
-
- this.heartBeater = new HeartBeater(context);
- }
-
- protected void reduce(Text key, Iterable<SolrInputDocumentWritable> values, Context context) throws IOException, InterruptedException {
- heartBeater.needHeartBeat();
- try {
- values = resolve(key, values, context);
- super.reduce(key, values, context);
- } catch (Exception e) {
- LOG.error("Unable to process key " + key, e);
- context.getCounter(getClass().getName() + ".errors", e.getClass().getName()).increment(1);
- exceptionHandler.handleException(e, null);
- } finally {
- heartBeater.cancelHeartBeat();
- }
- }
-
- private Iterable<SolrInputDocumentWritable> resolve(
- final Text key, final Iterable<SolrInputDocumentWritable> values, final Context context) {
-
- if (resolver instanceof NoChangeUpdateConflictResolver) {
- return values; // fast path
- }
- return new Iterable<SolrInputDocumentWritable>() {
- @Override
- public Iterator<SolrInputDocumentWritable> iterator() {
- return new WrapIterator(resolver.orderUpdates(key, new UnwrapIterator(values.iterator()), context));
- }
- };
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- heartBeater.close();
- super.cleanup(context);
- }
-
- /*
- * Verify that if a mappers's partitioner sends an item to partition X it implies that said item
- * is sent to the reducer with taskID == X. This invariant is currently required for Solr
- * documents to end up in the right Solr shard.
- */
- private void verifyPartitionAssignment(Context context) {
- if ("true".equals(System.getProperty("verifyPartitionAssignment", "true"))) {
- String partitionStr = context.getConfiguration().get("mapred.task.partition");
- if (partitionStr == null) {
- partitionStr = context.getConfiguration().get("mapreduce.task.partition");
- }
- int partition = Integer.parseInt(partitionStr);
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- Preconditions.checkArgument(partition == taskId,
- "mapred.task.partition: " + partition + " not equal to reducer taskId: " + taskId);
- }
- }
-
-
- ///////////////////////////////////////////////////////////////////////////////
- // Nested classes:
- ///////////////////////////////////////////////////////////////////////////////
- private static final class WrapIterator implements Iterator<SolrInputDocumentWritable> {
-
- private Iterator<SolrInputDocument> parent;
-
- private WrapIterator(Iterator<SolrInputDocument> parent) {
- this.parent = parent;
- }
-
- @Override
- public boolean hasNext() {
- return parent.hasNext();
- }
-
- @Override
- public SolrInputDocumentWritable next() {
- return new SolrInputDocumentWritable(parent.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-
-
- ///////////////////////////////////////////////////////////////////////////////
- // Nested classes:
- ///////////////////////////////////////////////////////////////////////////////
- private static final class UnwrapIterator implements Iterator<SolrInputDocument> {
-
- private Iterator<SolrInputDocumentWritable> parent;
-
- private UnwrapIterator(Iterator<SolrInputDocumentWritable> parent) {
- this.parent = parent;
- }
-
- @Override
- public boolean hasNext() {
- return parent.hasNext();
- }
-
- @Override
- public SolrInputDocument next() {
- return parent.next().getSolrInputDocument();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ToolRunnerHelpFormatter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ToolRunnerHelpFormatter.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ToolRunnerHelpFormatter.java
deleted file mode 100644
index 7570493..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ToolRunnerHelpFormatter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.helper.ASCIITextWidthCounter;
-import net.sourceforge.argparse4j.helper.TextHelper;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * Nicely formats the output of
- * {@link ToolRunner#printGenericCommandUsage(PrintStream)} with the same look and feel that argparse4j uses for help text.
- */
-class ToolRunnerHelpFormatter {
-
- public static String getGenericCommandUsage() {
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- String msg;
- try {
- ToolRunner.printGenericCommandUsage(new PrintStream(bout, true, "UTF-8"));
- msg = new String(bout.toByteArray(), StandardCharsets.UTF_8);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e); // unreachable
- }
-
- BufferedReader reader = new BufferedReader(new StringReader(msg));
- StringBuilder result = new StringBuilder();
- while (true) {
- String line;
- try {
- line = reader.readLine();
- } catch (IOException e) {
- throw new RuntimeException(e); // unreachable
- }
-
- if (line == null) {
- return result.toString(); // EOS
- }
-
- if (!line.startsWith("-")) {
- result.append(line + "\n");
- } else {
- line = line.trim();
- int i = line.indexOf(" ");
- if (i < 0) {
- i = line.indexOf('\t');
- }
- if (i < 0) {
- result.append(line + "\n");
- } else {
- String title = line.substring(0, i).trim();
- if (title.length() >= 3 && Character.isLetterOrDigit(title.charAt(1)) && Character.isLetterOrDigit(title.charAt(2))) {
- title = "-" + title; // prefer "--libjars" long arg style over "-libjars" style but retain "-D foo" short arg style
- }
- String help = line.substring(i, line.length()).trim();
- StringWriter strWriter = new StringWriter();
- PrintWriter writer = new PrintWriter(strWriter, true);
- TextHelper.printHelp(writer, title, help, new ASCIITextWidthCounter(), ArgumentParsers.getFormatWidth());
- result.append(strWriter.toString());
- }
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
deleted file mode 100644
index ee34a9c..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeMapper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For the meat see {@link TreeMergeOutputFormat}.
- */
-public class TreeMergeMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String MAX_SEGMENTS_ON_TREE_MERGE = "maxSegmentsOnTreeMerge";
-
- public static final String SOLR_SHARD_NUMBER = "_solrShardNumber";
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- LOGGER.trace("map key: {}, value: {}", key, value);
- context.write(value, NullWritable.get());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
deleted file mode 100644
index cac57c3..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.IndexWriterConfig.OpenMode;
-import org.apache.lucene.index.LogMergePolicy;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.misc.IndexMergeTool;
-import org.apache.lucene.store.Directory;
-import org.apache.solr.store.hdfs.HdfsDirectory;
-import org.apache.solr.update.SolrIndexWriter;
-import org.apache.solr.util.RTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * See {@link IndexMergeTool}.
- */
-public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable> {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- @Override
- public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException {
- Utils.getLogConfigFile(context.getConfiguration());
- Path workDir = getDefaultWorkFile(context, "");
- return new TreeMergeRecordWriter(context, workDir);
- }
-
-
- ///////////////////////////////////////////////////////////////////////////////
- // Nested classes:
- ///////////////////////////////////////////////////////////////////////////////
- private static final class TreeMergeRecordWriter extends RecordWriter<Text,NullWritable> {
-
- private final Path workDir;
- private final List<Path> shards = new ArrayList();
- private final HeartBeater heartBeater;
- private final TaskAttemptContext context;
-
- private static final Logger LOG = log;
-
- public TreeMergeRecordWriter(TaskAttemptContext context, Path workDir) {
- this.workDir = new Path(workDir, "data/index");
- this.heartBeater = new HeartBeater(context);
- this.context = context;
- }
-
- @Override
- public void write(Text key, NullWritable value) {
- LOG.info("map key: {}", key);
- heartBeater.needHeartBeat();
- try {
- Path path = new Path(key.toString());
- shards.add(path);
- } finally {
- heartBeater.cancelHeartBeat();
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException {
- LOG.debug("Task " + context.getTaskAttemptID() + " merging into dstDir: " + workDir + ", srcDirs: " + shards);
- writeShardNumberFile(context);
- heartBeater.needHeartBeat();
- try {
- Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
-
- // TODO: shouldn't we pull the Version from the solrconfig.xml?
- IndexWriterConfig writerConfig = new IndexWriterConfig(null)
- .setOpenMode(OpenMode.CREATE).setUseCompoundFile(false)
- //.setMergePolicy(mergePolicy) // TODO: grab tuned MergePolicy from solrconfig.xml?
- //.setMergeScheduler(...) // TODO: grab tuned MergeScheduler from solrconfig.xml?
- ;
-
- if (LOG.isDebugEnabled()) {
- writerConfig.setInfoStream(System.out);
- }
-// writerConfig.setRAMBufferSizeMB(100); // improve performance
-// writerConfig.setMaxThreadStates(1);
-
- // disable compound file to improve performance
- // also see http://lucene.472066.n3.nabble.com/Questions-on-compound-file-format-td489105.html
- // also see defaults in SolrIndexConfig
- MergePolicy mergePolicy = writerConfig.getMergePolicy();
- LOG.debug("mergePolicy was: {}", mergePolicy);
- if (mergePolicy instanceof TieredMergePolicy) {
- ((TieredMergePolicy) mergePolicy).setNoCFSRatio(0.0);
-// ((TieredMergePolicy) mergePolicy).setMaxMergeAtOnceExplicit(10000);
-// ((TieredMergePolicy) mergePolicy).setMaxMergeAtOnce(10000);
-// ((TieredMergePolicy) mergePolicy).setSegmentsPerTier(10000);
- } else if (mergePolicy instanceof LogMergePolicy) {
- ((LogMergePolicy) mergePolicy).setNoCFSRatio(0.0);
- }
- LOG.info("Using mergePolicy: {}", mergePolicy);
-
- IndexWriter writer = new IndexWriter(mergedIndex, writerConfig);
-
- Directory[] indexes = new Directory[shards.size()];
- for (int i = 0; i < shards.size(); i++) {
- indexes[i] = new HdfsDirectory(shards.get(i), context.getConfiguration());
- }
-
- context.setStatus("Logically merging " + shards.size() + " shards into one shard");
- LOG.info("Logically merging " + shards.size() + " shards into one shard: " + workDir);
- RTimer timer = new RTimer();
-
- writer.addIndexes(indexes);
- // TODO: avoid intermediate copying of files into dst directory; rename the files into the dir instead (cp -> rename)
- // This can improve performance and turns this phase into a true "logical" merge, completing in constant time.
- // See https://issues.apache.org/jira/browse/LUCENE-4746
-
- timer.stop();
- if (LOG.isDebugEnabled()) {
- context.getCounter(SolrCounters.class.getName(), SolrCounters.LOGICAL_TREE_MERGE_TIME.toString()).increment((long) timer.getTime());
- }
- LOG.info("Logical merge took {}ms", timer.getTime());
- int maxSegments = context.getConfiguration().getInt(TreeMergeMapper.MAX_SEGMENTS_ON_TREE_MERGE, Integer.MAX_VALUE);
- context.setStatus("Optimizing Solr: forcing mtree merge down to " + maxSegments + " segments");
- LOG.info("Optimizing Solr: forcing tree merge down to {} segments", maxSegments);
- timer = new RTimer();
- if (maxSegments < Integer.MAX_VALUE) {
- writer.forceMerge(maxSegments);
- // TODO: consider perf enhancement for no-deletes merges: bulk-copy the postings data
- // see http://lucene.472066.n3.nabble.com/Experience-with-large-merge-factors-tp1637832p1647046.html
- }
- timer.stop();
- if (LOG.isDebugEnabled()) {
- context.getCounter(SolrCounters.class.getName(), SolrCounters.PHYSICAL_TREE_MERGE_TIME.toString()).increment((long) timer.getTime());
- }
- LOG.info("Optimizing Solr: done forcing tree merge down to {} segments in {}ms", maxSegments, timer.getTime());
-
- // Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
- // commitTimeMSec in the commit data to do replication.
- //TODO no commitUpdateCommand
- SolrIndexWriter.setCommitData(writer, -1);
-
- timer = new RTimer();
- LOG.info("Optimizing Solr: Closing index writer");
- writer.close();
- LOG.info("Optimizing Solr: Done closing index writer in {}ms", timer.getTime());
- context.setStatus("Done");
- } finally {
- heartBeater.cancelHeartBeat();
- heartBeater.close();
- }
- }
-
- /*
- * For background see MapReduceIndexerTool.renameTreeMergeShardDirs()
- *
- * Also see MapReduceIndexerTool.run() method where it uses
- * NLineInputFormat.setNumLinesPerSplit(job, options.fanout)
- */
- private void writeShardNumberFile(TaskAttemptContext context) throws IOException {
- Preconditions.checkArgument(shards.size() > 0);
- String shard = shards.get(0).getParent().getParent().getName(); // move up from "data/index"
- String taskId = shard.substring("part-m-".length(), shard.length()); // e.g. part-m-00001
- int taskNum = Integer.parseInt(taskId);
- int outputShardNum = taskNum / shards.size();
- LOG.debug("Merging into outputShardNum: " + outputShardNum + " from taskId: " + taskId);
- Path shardNumberFile = new Path(workDir.getParent().getParent(), TreeMergeMapper.SOLR_SHARD_NUMBER);
- OutputStream out = shardNumberFile.getFileSystem(context.getConfiguration()).create(shardNumberFile);
- Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
- writer.write(String.valueOf(outputShardNum));
- writer.flush();
- writer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/UnbufferedDataInputInputStream.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/UnbufferedDataInputInputStream.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/UnbufferedDataInputInputStream.java
deleted file mode 100644
index 83823ce..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/UnbufferedDataInputInputStream.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-
-public class UnbufferedDataInputInputStream extends org.apache.solr.common.util.DataInputInputStream {
- private final DataInputStream in;
-
- public UnbufferedDataInputInputStream(DataInput in) {
- this.in = new DataInputStream(DataInputInputStream.constructInputStream(in));
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- in.readFully(b);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- in.readFully(b, off, len);
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- return in.skipBytes(n);
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- return in.readBoolean();
- }
-
- @Override
- public byte readByte() throws IOException {
- return in.readByte();
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return in.readUnsignedByte();
- }
-
- @Override
- public short readShort() throws IOException {
- return in.readShort();
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return in.readUnsignedShort();
- }
-
- @Override
- public char readChar() throws IOException {
- return in.readChar();
- }
-
- @Override
- public int readInt() throws IOException {
- return in.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return in.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return in.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return in.readDouble();
- }
-
- @Override
- public String readLine() throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
- return reader.readLine();
- }
-
- @Override
- public String readUTF() throws IOException {
- return in.readUTF();
- }
-
- @Override
- public int read() throws IOException {
- return in.read();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/Utils.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/Utils.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/Utils.java
deleted file mode 100644
index fa10154..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/Utils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.PropertyConfigurator;
-
-import com.google.common.annotations.Beta;
-import org.apache.solr.common.util.SuppressForbidden;
-
-
-@Beta
-public final class Utils {
-
- private static final String LOG_CONFIG_FILE = "hadoop.log4j.configuration";
-
- public static void setLogConfigFile(File file, Configuration conf) {
- conf.set(LOG_CONFIG_FILE, file.getName());
- }
-
- public static void getLogConfigFile(Configuration conf) {
- String log4jPropertiesFile = conf.get(LOG_CONFIG_FILE);
- configureLog4jProperties(log4jPropertiesFile);
- }
-
- @SuppressForbidden(reason = "method is specific to log4j")
- public static void configureLog4jProperties(String log4jPropertiesFile) {
- if (log4jPropertiesFile != null) {
- PropertyConfigurator.configure(log4jPropertiesFile);
- }
- }
-
- public static String getShortClassName(Class clazz) {
- return getShortClassName(clazz.getName());
- }
-
- public static String getShortClassName(String className) {
- int i = className.lastIndexOf('.'); // regular class
- int j = className.lastIndexOf('$'); // inner class
- return className.substring(1 + Math.max(i, j));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
deleted file mode 100644
index 76928aa..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import com.google.common.io.Files;
-import org.apache.commons.io.FileUtils;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extracts SolrCloud information from ZooKeeper.
- */
-final class ZooKeeperInspector {
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public List<List<String>> extractShardUrls(String zkHost, String collection) {
-
- DocCollection docCollection = extractDocCollection(zkHost, collection);
- List<Slice> slices = getSortedSlices(docCollection.getSlices());
- List<List<String>> solrUrls = new ArrayList<>(slices.size());
- for (Slice slice : slices) {
- if (slice.getLeader() == null) {
- throw new IllegalArgumentException("Cannot find SolrCloud slice leader. " +
- "It looks like not all of your shards are registered in ZooKeeper yet");
- }
- Collection<Replica> replicas = slice.getReplicas();
- List<String> urls = new ArrayList<>(replicas.size());
- for (Replica replica : replicas) {
- ZkCoreNodeProps props = new ZkCoreNodeProps(replica);
- urls.add(props.getCoreUrl());
- }
- solrUrls.add(urls);
- }
- return solrUrls;
- }
-
- public DocCollection extractDocCollection(String zkHost, String collection) {
- if (collection == null) {
- throw new IllegalArgumentException("collection must not be null");
- }
- SolrZkClient zkClient = getZkClient(zkHost);
-
- try (ZkStateReader zkStateReader = new ZkStateReader(zkClient)) {
- try {
- // first check for alias
- collection = checkForAlias(zkClient, collection);
- zkStateReader.createClusterStateWatchersAndUpdate();
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot find expected information for SolrCloud in ZooKeeper: " + zkHost, e);
- }
-
- try {
- return zkStateReader.getClusterState().getCollection(collection);
- } catch (SolrException e) {
- throw new IllegalArgumentException("Cannot find collection '" + collection + "' in ZooKeeper: " + zkHost, e);
- }
- } finally {
- zkClient.close();
- }
- }
-
- public SolrZkClient getZkClient(String zkHost) {
- if (zkHost == null) {
- throw new IllegalArgumentException("zkHost must not be null");
- }
-
- SolrZkClient zkClient;
- try {
- zkClient = new SolrZkClient(zkHost, 30000);
- } catch (Exception e) {
- throw new IllegalArgumentException("Cannot connect to ZooKeeper: " + zkHost, e);
- }
- return zkClient;
- }
-
- public List<Slice> getSortedSlices(Collection<Slice> slices) {
- List<Slice> sorted = new ArrayList(slices);
- Collections.sort(sorted, (slice1, slice2) -> {
- Comparator c = new AlphaNumericComparator();
- return c.compare(slice1.getName(), slice2.getName());
- });
- LOG.trace("Sorted slices: {}", sorted);
- return sorted;
- }
-
- /**
- * Returns config value given collection name
- * Borrowed heavily from Solr's ZKController.
- */
- public String readConfigName(SolrZkClient zkClient, String collection)
- throws KeeperException, InterruptedException {
- if (collection == null) {
- throw new IllegalArgumentException("collection must not be null");
- }
- String configName = null;
-
- // first check for alias
- collection = checkForAlias(zkClient, collection);
-
- String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
- if (LOG.isInfoEnabled()) {
- LOG.info("Load collection config from:" + path);
- }
- byte[] data = zkClient.getData(path, null, null, true);
-
- if(data != null) {
- ZkNodeProps props = ZkNodeProps.load(data);
- configName = props.getStr(ZkController.CONFIGNAME_PROP);
- }
-
- if (configName != null && !zkClient.exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true)) {
- LOG.error("Specified config does not exist in ZooKeeper:" + configName);
- throw new IllegalArgumentException("Specified config does not exist in ZooKeeper:"
- + configName);
- }
-
- return configName;
- }
-
- private String checkForAlias(SolrZkClient zkClient, String collection)
- throws KeeperException, InterruptedException {
- byte[] aliasData = zkClient.getData(ZkStateReader.ALIASES, null, null, true);
- Aliases aliases = ClusterState.load(aliasData);
- String alias = aliases.getCollectionAlias(collection);
- if (alias != null) {
- List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
- if (aliasList.size() > 1) {
- throw new IllegalArgumentException("collection cannot be an alias that maps to multiple collections");
- }
- collection = aliasList.get(0);
- }
- return collection;
- }
-
- /**
- * Download and return the config directory from ZK
- */
- public File downloadConfigDir(SolrZkClient zkClient, String configName)
- throws IOException, InterruptedException, KeeperException {
- File dir = Files.createTempDir();
- dir.deleteOnExit();
- ZkConfigManager configManager = new ZkConfigManager(zkClient);
- configManager.downloadConfigDir(configName, dir.toPath());
- File confDir = new File(dir, "conf");
- if (!confDir.isDirectory()) {
- // create a temporary directory with "conf" subdir and mv the config in there. This is
- // necessary because of CDH-11188; solrctl does not generate nor accept directories with e.g.
- // conf/solrconfig.xml which is necessary for proper solr operation. This should work
- // even if solrctl changes.
- confDir = new File(Files.createTempDir().getAbsolutePath(), "conf");
- confDir.getParentFile().deleteOnExit();
- Files.move(dir, confDir);
- dir = confDir.getParentFile();
- }
- FileUtils.writeStringToFile(new File(dir, "solr.xml"), "<solr><solrcloud></solrcloud></solr>", "UTF-8");
- verifyConfigDir(confDir);
- return dir;
- }
-
- private void verifyConfigDir(File confDir) throws IOException {
- File solrConfigFile = new File(confDir, "solrconfig.xml");
- if (!solrConfigFile.exists()) {
- throw new IOException("Detected invalid Solr config dir in ZooKeeper - Reason: File not found: "
- + solrConfigFile.getName());
- }
- if (!solrConfigFile.isFile()) {
- throw new IOException("Detected invalid Solr config dir in ZooKeeper - Reason: Not a file: "
- + solrConfigFile.getName());
- }
- if (!solrConfigFile.canRead()) {
- throw new IOException("Insufficient permissions to read file: " + solrConfigFile);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/NoChangeUpdateConflictResolver.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/NoChangeUpdateConflictResolver.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/NoChangeUpdateConflictResolver.java
deleted file mode 100644
index 0eae940..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/NoChangeUpdateConflictResolver.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.dedup;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.solr.common.SolrInputDocument;
-
-/**
- * UpdateConflictResolver implementation that returns the solr documents in the
- * same order as they are received on input, i.e. without change in order.
- */
-public final class NoChangeUpdateConflictResolver implements UpdateConflictResolver {
-
- @Override
- public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
- return updates;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RejectingUpdateConflictResolver.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RejectingUpdateConflictResolver.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RejectingUpdateConflictResolver.java
deleted file mode 100644
index 60efb4c..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RejectingUpdateConflictResolver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.dedup;
-
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.solr.common.SolrInputDocument;
-
-/**
- * UpdateConflictResolver implementation that rejects multiple documents with
- * the same key with an exception.
- */
-public final class RejectingUpdateConflictResolver implements UpdateConflictResolver {
-
- @Override
- public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
- SolrInputDocument firstUpdate = null;
- while (updates.hasNext()) {
- if (firstUpdate == null) {
- firstUpdate = updates.next();
- assert firstUpdate != null;
- } else {
- throw new IllegalArgumentException("Update conflict! Documents with the same unique key are forbidden: "
- + key);
- }
- }
- assert firstUpdate != null;
- return Collections.singletonList(firstUpdate).iterator();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RetainMostRecentUpdateConflictResolver.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RetainMostRecentUpdateConflictResolver.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RetainMostRecentUpdateConflictResolver.java
deleted file mode 100644
index c2637ce..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/RetainMostRecentUpdateConflictResolver.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.dedup;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.hadoop.HdfsFileFieldNames;
-import org.apache.solr.hadoop.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * UpdateConflictResolver implementation that ignores all but the most recent
- * document version, based on a configurable numeric Solr field, which defaults
- * to the file_last_modified timestamp.
- */
-public class RetainMostRecentUpdateConflictResolver implements UpdateConflictResolver, Configurable {
-
- private Configuration conf;
- private String orderByFieldName = ORDER_BY_FIELD_NAME_DEFAULT;
-
- public static final String ORDER_BY_FIELD_NAME_KEY =
- RetainMostRecentUpdateConflictResolver.class.getName() + ".orderByFieldName";
-
- public static final String ORDER_BY_FIELD_NAME_DEFAULT = HdfsFileFieldNames.FILE_LAST_MODIFIED;
-
- public static final String COUNTER_GROUP = Utils.getShortClassName(RetainMostRecentUpdateConflictResolver.class);
- public static final String DUPLICATES_COUNTER_NAME = "Number of documents ignored as duplicates";
- public static final String OUTDATED_COUNTER_NAME = "Number of documents ignored as outdated";
-
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- this.orderByFieldName = conf.get(ORDER_BY_FIELD_NAME_KEY, orderByFieldName);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- protected String getOrderByFieldName() {
- return orderByFieldName;
- }
-
- @Override
- public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
- return getMaximum(updates, getOrderByFieldName(), new SolrInputDocumentComparator.TimeStampComparator(), ctx);
- }
-
- /** Returns the most recent document among the colliding updates */
- protected Iterator<SolrInputDocument> getMaximum(Iterator<SolrInputDocument> updates, String fieldName,
- Comparator child, Context context) {
-
- SolrInputDocumentComparator comp = new SolrInputDocumentComparator(fieldName, child);
- SolrInputDocument max = null;
- long numDupes = 0;
- long numOutdated = 0;
- while (updates.hasNext()) {
- SolrInputDocument next = updates.next();
- assert next != null;
- if (max == null) {
- max = next;
- } else {
- int c = comp.compare(next, max);
- if (c == 0) {
- LOG.debug("Ignoring document version because it is a duplicate: {}", next);
- numDupes++;
- } else if (c > 0) {
- LOG.debug("Ignoring document version because it is outdated: {}", max);
- max = next;
- numOutdated++;
- } else {
- LOG.debug("Ignoring document version because it is outdated: {}", next);
- numOutdated++;
- }
- }
- }
-
- assert max != null;
- if (numDupes > 0) {
- context.getCounter(COUNTER_GROUP, DUPLICATES_COUNTER_NAME).increment(numDupes);
- }
- if (numOutdated > 0) {
- context.getCounter(COUNTER_GROUP, OUTDATED_COUNTER_NAME).increment(numOutdated);
- }
- return Collections.singletonList(max).iterator();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1a574df/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/SolrInputDocumentComparator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/SolrInputDocumentComparator.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/SolrInputDocumentComparator.java
deleted file mode 100644
index e8cfdbb..0000000
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/dedup/SolrInputDocumentComparator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.hadoop.dedup;
-
-import java.util.Comparator;
-
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.SolrInputField;
-
-/**
- * Default mechanism of determining which of two Solr documents with the same
- * key is the more recent version.
- */
-public final class SolrInputDocumentComparator implements Comparator<SolrInputDocument> {
-
- private Comparator child;
- private String fieldName;
-
- SolrInputDocumentComparator(String fieldName, Comparator child) {
- this.child = child;
- this.fieldName = fieldName;
- }
-
- @Override
- public int compare(SolrInputDocument doc1, SolrInputDocument doc2) {
- SolrInputField f1 = doc1.getField(fieldName);
- SolrInputField f2 = doc2.getField(fieldName);
- if (f1 == f2) {
- return 0;
- } else if (f1 == null) {
- return -1;
- } else if (f2 == null) {
- return 1;
- }
-
- Object v1 = f1.getFirstValue();
- Object v2 = f2.getFirstValue();
- return child.compare(v1, v2);
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- // Nested classes:
- ///////////////////////////////////////////////////////////////////////////////
- public static final class TimeStampComparator implements Comparator {
-
- @Override
- public int compare(Object v1, Object v2) {
- if (v1 == v2) {
- return 0;
- } else if (v1 == null) {
- return -1;
- } else if (v2 == null) {
- return 1;
- }
- long t1 = getLong(v1);
- long t2 = getLong(v2);
- return (t1 < t2 ? -1 : (t1==t2 ? 0 : 1));
- }
-
- private long getLong(Object v) {
- if (v instanceof Long) {
- return ((Long) v).longValue();
- } else {
- return Long.parseLong(v.toString());
- }
- }
-
- }
-
-}