You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/02/09 00:53:25 UTC
svn commit: r1658277 [19/38] - in /lucene/dev/branches/lucene6005: ./
dev-tools/ dev-tools/idea/solr/contrib/dataimporthandler/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/lucene/replicator/
dev-tools/maven/solr/ dev-tools/maven/solr/contrib/...
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java Sun Feb 8 23:53:14 2015
@@ -28,7 +28,6 @@ import org.apache.lucene.store.LockFacto
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.solr.common.SolrException;
@@ -113,8 +112,7 @@ public class StandardDirectoryFactory ex
* carefully - some Directory wrappers will
* cache files for example.
*
- * This implementation works with two wrappers:
- * NRTCachingDirectory and RateLimitedDirectoryWrapper.
+ * This implementation works with NRTCachingDirectory.
*
* You should first {@link Directory#sync(java.util.Collection)} any file that will be
* moved or avoid cached files through settings.
@@ -143,13 +141,11 @@ public class StandardDirectoryFactory ex
super.move(fromDir, toDir, fileName, ioContext);
}
- // special hack to work with NRTCachingDirectory and RateLimitedDirectoryWrapper
+ // special hack to work with NRTCachingDirectory
private Directory getBaseDir(Directory dir) {
Directory baseDir;
if (dir instanceof NRTCachingDirectory) {
baseDir = ((NRTCachingDirectory)dir).getDelegate();
- } else if (dir instanceof RateLimitedDirectoryWrapper) {
- baseDir = ((RateLimitedDirectoryWrapper)dir).getDelegate();
} else {
baseDir = dir;
}
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/ZkContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/ZkContainer.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/ZkContainer.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/core/ZkContainer.java Sun Feb 8 23:53:14 2015
@@ -180,16 +180,12 @@ public class ZkContainer {
log.error("Could not connect to ZooKeeper", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (KeeperException e) {
+ } catch (IOException | KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
-
+
}
this.zkController = zkController;
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/BlobHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/BlobHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/BlobHandler.java Sun Feb 8 23:53:14 2015
@@ -23,6 +23,7 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.text.MessageFormat;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.UpdateParams;
@@ -51,6 +53,7 @@ import org.apache.solr.response.SolrQuer
import org.apache.solr.schema.FieldType;
import org.apache.solr.search.QParser;
import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.SimplePostTool;
@@ -58,6 +61,7 @@ import org.apache.solr.util.plugin.Plugi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Collections.singletonMap;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
public class BlobHandler extends RequestHandlerBase implements PluginInfoInitialized{
@@ -85,10 +89,12 @@ public class BlobHandler extends Request
}
String err = SolrConfigHandler.validateName(blobName);
if(err!=null){
+ log.warn("no blob name");
rsp.add("error", err);
return;
}
if(req.getContentStreams() == null ) {
+ log.warn("no content stream");
rsp.add("error","No stream");
return;
}
@@ -108,6 +114,7 @@ public class BlobHandler extends Request
"q", "md5:" + md5,
"fl", "id,size,version,timestamp,blobName")),
rsp);
+ log.warn("duplicate entry for blob :"+blobName);
return;
}
@@ -122,14 +129,19 @@ public class BlobHandler extends Request
}
version++;
String id = blobName+"/"+version;
- indexMap(req, makeMap(
+ Map<String, Object> doc = makeMap(
"id", id,
"md5", md5,
"blobName", blobName,
"version", version,
"timestamp", new Date(),
"size", payload.limit(),
- "blob", payload));
+ "blob", payload);
+ verifyWithRealtimeGet(blobName, version, req, doc);
+ log.info(MessageFormat.format("New blob inserting {0} ,size {1}, md5 {2}",doc.get("id"), payload.limit(),md5));
+ indexMap(req, rsp, doc);
+ log.info(" Successfully Added and committed a blob with id {} and size {} ",id, payload.limit());
+
break;
}
@@ -187,24 +199,44 @@ public class BlobHandler extends Request
req.forward(null,
new MapSolrParams((Map) makeMap(
- "q", MessageFormat.format(q,blobName,version),
- "fl", "id,size,version,timestamp,blobName",
+ "q", MessageFormat.format(q, blobName, version),
+ "fl", "id,size,version,timestamp,blobName,md5",
"sort", "version desc"))
- ,rsp);
+ , rsp);
}
}
}
- public static void indexMap(SolrQueryRequest req, Map<String, Object> doc) throws IOException {
+ private void verifyWithRealtimeGet(String blobName, long version, SolrQueryRequest req, Map<String, Object> doc) {
+ for(;;) {
+ SolrQueryResponse response = new SolrQueryResponse();
+ String id = blobName + "/" + version;
+ req.forward("/get", new MapSolrParams(singletonMap("id", id)), response);
+ if(response.getValues().get("doc") == null) {
+ //ensure that the version does not exist
+ return;
+ } else {
+ log.info("id {} already exists trying next ",id);
+ version++;
+ doc.put("version", version);
+ id = blobName + "/" + version;
+ doc.put("id", id);
+ }
+ }
+
+ }
+
+ public static void indexMap(SolrQueryRequest req, SolrQueryResponse rsp, Map<String, Object> doc) throws IOException {
SolrInputDocument solrDoc = new SolrInputDocument();
for (Map.Entry<String, Object> e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue());
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(req.getParams().get(UpdateParams.UPDATE_CHAIN));
- UpdateRequestProcessor processor = processorChain.createProcessor(req,null);
+ UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
AddUpdateCommand cmd = new AddUpdateCommand(req);
- cmd.commitWithin =1;
cmd.solrDoc = solrDoc;
+ log.info("Adding doc "+doc);
processor.processAdd(cmd);
-
+ log.info("committing doc"+doc);
+ processor.processCommit(new CommitUpdateCommand(req, false));
}
@Override
@@ -245,7 +277,11 @@ public class BlobHandler extends Request
"<updateHandler class='solr.DirectUpdateHandler2'>\n" +
" <updateLog>\n" +
" <str name='dir'>${solr.ulog.dir:}</str>\n" +
- " </updateLog>\n" +
+ " </updateLog>\n " +
+ " <autoCommit> \n" +
+ " <maxDocs>1</maxDocs> \n" +
+ " <openSearcher>true</openSearcher> \n" +
+ " </autoCommit>" +
"</updateHandler>\n" +
"<requestHandler name='standard' class='solr.StandardRequestHandler' default='true' />\n" +
"<requestHandler name='/analysis/field' startup='lazy' class='solr.FieldAnalysisRequestHandler' />\n" +
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/DumpRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/DumpRequestHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/DumpRequestHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/DumpRequestHandler.java Sun Feb 8 23:53:14 2015
@@ -54,6 +54,12 @@ public class DumpRequestHandler extends
}
}
+ if(Boolean.TRUE.equals( req.getParams().getBool("getdefaults"))){
+ NamedList def = (NamedList) initArgs.get(PluginInfo.DEFAULTS);
+ rsp.add("getdefaults", def);
+ }
+
+
if(Boolean.TRUE.equals( req.getParams().getBool("initArgs"))) rsp.add("initArgs", initArgs);
// Write the streams...
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Sun Feb 8 23:53:14 2015
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* @since solr 1.4
*/
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
SolrCore core;
@@ -212,7 +212,7 @@ public class ReplicationHandler extends
doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
rsp.add(STATUS, OK_STATUS);
} else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
- deleteSnapshot(new ModifiableSolrParams(solrParams), rsp, req);
+ deleteSnapshot(new ModifiableSolrParams(solrParams));
rsp.add(STATUS, OK_STATUS);
} else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
String masterUrl = solrParams.get(MASTER_URL);
@@ -272,7 +272,7 @@ public class ReplicationHandler extends
}
}
- private void deleteSnapshot(ModifiableSolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
+ private void deleteSnapshot(ModifiableSolrParams params) {
String name = params.get("name");
if(name == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
@@ -329,11 +329,16 @@ public class ReplicationHandler extends
if (!snapPullLock.tryLock())
return false;
try {
- tempSnapPuller = snapPuller;
if (masterUrl != null) {
+ if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
+ tempSnapPuller.destroy();
+ }
+
NamedList<Object> nl = solrParams.toNamedList();
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
+ } else {
+ tempSnapPuller = snapPuller;
}
return tempSnapPuller.fetchLatestIndex(core, forceReplication);
} catch (Exception e) {
@@ -572,7 +577,7 @@ public class ReplicationHandler extends
NamedList list = super.getStatistics();
if (core != null) {
list.add("indexSize", NumberUtils.readableSize(getIndexSize()));
- CommitVersionInfo vInfo = getIndexVersion();
+ CommitVersionInfo vInfo = (core != null && !core.isClosed()) ? getIndexVersion(): null;
list.add("indexVersion", null == vInfo ? 0 : vInfo.version);
list.add(GENERATION, null == vInfo ? 0 : vInfo.generation);
@@ -788,8 +793,8 @@ public class ReplicationHandler extends
} else if (clzz == List.class) {
String ss[] = s.split(",");
List<String> l = new ArrayList<>();
- for (int i = 0; i < ss.length; i++) {
- l.add(new Date(Long.valueOf(ss[i])).toString());
+ for (String s1 : ss) {
+ l.add(new Date(Long.valueOf(s1)).toString());
}
nl.add(key, l);
} else {
@@ -1001,6 +1006,9 @@ public class ReplicationHandler extends
if (snapPuller != null) {
snapPuller.destroy();
}
+ if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
+ tempSnapPuller.destroy();
+ }
}
@Override
@@ -1174,6 +1182,7 @@ public class ReplicationHandler extends
offset = offset == -1 ? 0 : offset;
int read = (int) Math.min(buf.length, filelen - offset);
in.readBytes(buf, 0, read);
+
fos.writeInt(read);
if (useChecksum) {
checksum.reset();
@@ -1182,6 +1191,7 @@ public class ReplicationHandler extends
}
fos.write(buf, 0, read);
fos.flush();
+ LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
//Pause if necessary
maxBytesBeforePause += read;
@@ -1231,8 +1241,8 @@ public class ReplicationHandler extends
FileInputStream inputStream = null;
try {
initWrite();
-
- //if if is a conf file read from config diectory
+
+ //if if is a conf file read from config directory
File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
if (file.exists() && file.canRead()) {
@@ -1356,7 +1366,7 @@ public class ReplicationHandler extends
* Boolean param for tests that can be specified when using
* {@link #CMD_FETCH_INDEX} to force the current request to block until
* the fetch is complete. <b>NOTE:</b> This param is not advised for
- * non-test code, since the the durration of the fetch for non-trivial
+ * non-test code, since the the duration of the fetch for non-trivial
* indexes will likeley cause the request to time out.
*
* @lucene.internal
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Feb 8 23:53:14 2015
@@ -20,6 +20,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@@ -32,7 +34,6 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
@@ -73,10 +74,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -176,14 +179,11 @@ public class SnapPuller {
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
- HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
-
- return httpClient;
+ return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
}
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
- final SolrParams params = SolrParams.toSolrParams(initArgs);
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -246,19 +246,16 @@ public class SnapPuller {
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient); //XXX modify to use shardhandler
- NamedList rsp;
- try {
+
+ // TODO modify to use shardhandler
+ try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient)) {
client.setSoTimeout(60000);
client.setConnectionTimeout(15000);
- rsp = client.request(req);
+ return client.request(req);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
- } finally {
- client.shutdown();
}
- return rsp;
}
/**
@@ -271,8 +268,9 @@ public class SnapPuller {
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient); //XXX modify to use shardhandler
- try {
+
+ // TODO modify to use shardhandler
+ try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient)) {
client.setSoTimeout(60000);
client.setConnectionTimeout(15000);
NamedList response = client.request(req);
@@ -291,8 +289,6 @@ public class SnapPuller {
} catch (SolrServerException e) {
throw new IOException(e);
- } finally {
- client.shutdown();
}
}
@@ -407,13 +403,41 @@ public class SnapPuller {
}
if (!isFullCopyNeeded) {
- // rollback - and do it before we download any files
- // so we don't remove files we thought we didn't need
- // to download later
- solrCore.getUpdateHandler().getSolrCoreState()
- .closeIndexWriter(core, true);
+ // a searcher might be using some flushed but not committed segments
+ // because of soft commits (which open a searcher on IW's data)
+ // so we need to close the existing searcher on the last commit
+ // and wait until we are able to clean up all unused lucene files
+ if (solrCore.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
+ solrCore.closeSearcher();
+ }
+
+ // rollback and reopen index writer and wait until all unused files
+ // are successfully deleted
+ solrCore.getUpdateHandler().newIndexWriter(true);
+ RefCounted<IndexWriter> writer = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+ try {
+ IndexWriter indexWriter = writer.get();
+ int c = 0;
+ indexWriter.deleteUnusedFiles();
+ while (hasUnusedFiles(indexDir, commit)) {
+ indexWriter.deleteUnusedFiles();
+ LOG.info("Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
+ Thread.sleep(1000);
+ c++;
+ if (c >= 30) {
+ LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead");
+ isFullCopyNeeded = true;
+ break;
+ }
+ }
+ if (c > 0) {
+ LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
+ }
+ } finally {
+ writer.decref();
+ }
+ solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(core, true);
}
-
boolean reloadCore = false;
try {
@@ -490,7 +514,7 @@ public class SnapPuller {
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}
- openNewSearcherAndUpdateCommitPoint(isFullCopyNeeded);
+ openNewSearcherAndUpdateCommitPoint();
}
replicationStartTime = 0;
@@ -544,6 +568,24 @@ public class SnapPuller {
}
}
+ private boolean hasUnusedFiles(Directory indexDir, IndexCommit commit) throws IOException {
+ Set<String> currentFiles = new HashSet<>();
+ String segmentsFileName = commit.getSegmentsFileName();
+ SegmentInfos infos = SegmentInfos.readCommit(indexDir, segmentsFileName);
+ for (SegmentCommitInfo info : infos.asList()) {
+ Set<String> files = info.info.files(); // All files that belong to this segment
+ currentFiles.addAll(files);
+ }
+ String[] allFiles = indexDir.listAll();
+ for (String file : allFiles) {
+ if (!file.equals(segmentsFileName) && !currentFiles.contains(file) && !file.endsWith(".lock")) {
+ LOG.info("Found unused file: " + file);
+ return true;
+ }
+ }
+ return false;
+ }
+
private volatile Exception fsyncException;
/**
@@ -651,9 +693,7 @@ public class SnapPuller {
List<String> l = new ArrayList<>();
if (str != null && str.length() != 0) {
String[] ss = str.split(",");
- for (int i = 0; i < ss.length; i++) {
- l.add(ss[i]);
- }
+ Collections.addAll(l, ss);
}
sb.append(replicationTime);
if (!l.isEmpty()) {
@@ -666,7 +706,7 @@ public class SnapPuller {
return sb;
}
- private void openNewSearcherAndUpdateCommitPoint(boolean isFullCopyNeeded) throws IOException {
+ private void openNewSearcherAndUpdateCommitPoint() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
@@ -678,9 +718,7 @@ public class SnapPuller {
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
- } catch (InterruptedException e) {
- SolrException.log(LOG, e);
- } catch (ExecutionException e) {
+ } catch (InterruptedException | ExecutionException e) {
SolrException.log(LOG, e);
}
}
@@ -703,8 +741,7 @@ public class SnapPuller {
private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
// TODO: there should probably be a DirectoryFactory#concatPath(parent, name)
// or something
- String tmpIdxDir = core.getDataDir() + tmpIdxDirName;
- return tmpIdxDir;
+ return core.getDataDir() + tmpIdxDirName;
}
private void reloadCore() {
@@ -731,7 +768,7 @@ public class SnapPuller {
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
- confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
+ confFilesDownloaded = Collections.synchronizedList(new ArrayList<>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
try {
boolean status = tmpconfDir.mkdirs();
@@ -822,7 +859,7 @@ public class SnapPuller {
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
* <p/>
*/
- private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
+ private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) {
LOG.debug("Moving file: {}", fname);
boolean success = false;
try {
@@ -856,7 +893,6 @@ public class SnapPuller {
}
}
String segmentsFile = null;
- List<String> movedfiles = new ArrayList<>();
for (Map<String, Object> f : filesDownloaded) {
String fname = (String) f.get(NAME);
// the segments file must be copied last
@@ -868,12 +904,11 @@ public class SnapPuller {
segmentsFile = fname;
continue;
}
- if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
- movedfiles.add(fname);
+ if (!moveAFile(tmpIdxDir, indexDir, fname)) return false;
}
//copy the segments file last
if (segmentsFile != null) {
- if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
+ if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false;
}
return true;
}
@@ -899,7 +934,7 @@ public class SnapPuller {
private void copyTmpConfFiles2Conf(File tmpconfDir) {
boolean status = false;
File confDir = new File(solrCore.getResourceLoader().getConfigDir());
- for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<File>())) {
+ for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<>())) {
File oldFile = new File(confDir, file.getPath().substring(tmpconfDir.getPath().length(), file.getPath().length()));
if (!oldFile.getParentFile().exists()) {
status = oldFile.getParentFile().mkdirs();
@@ -1111,7 +1146,7 @@ public class SnapPuller {
return null;
tmp = new HashMap<>(tmp);
if (tmpFileFetcher != null)
- tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
+ tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded());
return tmp;
}
@@ -1132,58 +1167,53 @@ public class SnapPuller {
}
}
+ private interface FileInterface {
+ public void sync() throws IOException;
+ public void write(byte[] buf, int packetSize) throws IOException;
+ public void close() throws Exception;
+ public void delete() throws Exception;
+ }
+
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
*/
- private class DirectoryFileFetcher {
- boolean includeChecksum = true;
-
- Directory copy2Dir;
-
- String fileName;
-
- String saveAs;
-
- long size;
-
- long bytesDownloaded = 0;
-
- byte[] buf = new byte[1024 * 1024];
-
- Checksum checksum;
-
- int errorCount = 0;
-
+ private class FileFetcher {
+ private final FileInterface file;
+ private boolean includeChecksum = true;
+ private String fileName;
+ private String saveAs;
private boolean isConf;
-
- private boolean aborted = false;
-
private Long indexGen;
- private IndexOutput outStream;
+ private long size;
+ private long bytesDownloaded = 0;
+ private byte[] buf = new byte[1024 * 1024];
+ private Checksum checksum;
+ private int errorCount = 0;
+ private boolean aborted = false;
- DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+ FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
boolean isConf, long latestGen) throws IOException {
- this.copy2Dir = tmpIndexDir;
+ this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
this.isConf = isConf;
this.saveAs = saveAs;
-
indexGen = latestGen;
-
- outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
-
if (includeChecksum)
checksum = new Adler32();
}
+ public long getBytesDownloaded() {
+ return bytesDownloaded;
+ }
+
/**
* The main method which downloads file
*/
- void fetchFile() throws Exception {
+ public void fetchFile() throws Exception {
try {
while (true) {
final FastInputStream is = getStream();
@@ -1202,12 +1232,12 @@ public class SnapPuller {
}
} finally {
cleanup();
- //if cleanup suceeds . The file is downloaded fully. do an fsync
+ //if cleanup succeeds . The file is downloaded fully. do an fsync
fsyncService.submit(new Runnable(){
@Override
public void run() {
try {
- copy2Dir.sync(Collections.singleton(saveAs));
+ file.sync();
} catch (IOException e) {
fsyncException = e;
}
@@ -1231,7 +1261,7 @@ public class SnapPuller {
//read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
- LOG.warn("No content received for file: " + currentFile);
+ LOG.warn("No content received for file: {}", fileName);
return NO_CONTENT;
}
if (buf.length < packetSize)
@@ -1249,45 +1279,45 @@ public class SnapPuller {
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
- LOG.error("Checksum not matched between client and server for: " + currentFile);
+ LOG.error("Checksum not matched between client and server for file: {}", fileName);
//if checksum is wrong it is a problem return for retry
return 1;
}
}
//if everything is fine, write down the packet to the file
- writeBytes(packetSize);
+ file.write(buf, packetSize);
bytesDownloaded += packetSize;
+ LOG.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
if (bytesDownloaded >= size)
return 0;
- //errorcount is always set to zero after a successful packet
+ //errorCount is always set to zero after a successful packet
errorCount = 0;
}
} catch (ReplicationHandlerException e) {
throw e;
} catch (Exception e) {
- LOG.warn("Error in fetching packets ", e);
- //for any failure , increment the error count
+ LOG.warn("Error in fetching file: {} (downloaded {} of {} bytes)",
+ fileName, bytesDownloaded, size, e);
+ //for any failure, increment the error count
errorCount++;
- //if it fails for the same pacaket for MAX_RETRIES fail and come out
+ //if it fails for the same packet for MAX_RETRIES fail and come out
if (errorCount > MAX_RETRIES) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Fetch failed for file:" + fileName, e);
+ "Failed to fetch file: " + fileName +
+ " (downloaded " + bytesDownloaded + " of " + size + " bytes" +
+ ", error count: " + errorCount + " > " + MAX_RETRIES + ")", e);
}
return ERR;
}
}
- protected void writeBytes(int packetSize) throws IOException {
- outStream.writeBytes(buf, 0, packetSize);
- }
-
/**
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
* other wise it fails. So read everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
- | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
+ | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
}
@@ -1296,9 +1326,9 @@ public class SnapPuller {
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
- | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
- | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
- | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
+ | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
+ | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
+ | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
}
@@ -1307,30 +1337,30 @@ public class SnapPuller {
*/
private void cleanup() {
try {
- outStream.close();
- } catch (Exception e) {/* noop */
- LOG.error("Error closing the file stream: "+ this.saveAs ,e);
+ file.close();
+ } catch (Exception e) {/* no-op */
+ LOG.error("Error closing file: {}", this.saveAs, e);
}
if (bytesDownloaded != size) {
//if the download is not complete then
//delete the file being downloaded
try {
- copy2Dir.deleteFile(saveAs);
+ file.delete();
} catch (Exception e) {
- LOG.error("Error deleting file in cleanup" + e.getMessage());
+ LOG.error("Error deleting file: {}", this.saveAs, e);
}
- //if the failure is due to a user abort it is returned nomally else an exception is thrown
+ //if the failure is due to a user abort it is returned normally else an exception is thrown
if (!aborted)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to download " + fileName + " completely. Downloaded "
- + bytesDownloaded + "!=" + size);
+ "Unable to download " + fileName + " completely. Downloaded "
+ + bytesDownloaded + "!=" + size);
}
}
/**
* Open a new stream using HttpClient
*/
- FastInputStream getStream() throws IOException {
+ private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -1346,7 +1376,7 @@ public class SnapPuller {
params.set(FILE, fileName);
}
if (useInternal) {
- params.set(COMPRESSION, "true");
+ params.set(COMPRESSION, "true");
}
//use checksum
if (this.includeChecksum) {
@@ -1354,18 +1384,18 @@ public class SnapPuller {
}
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
- // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
- // the server starts from the offset
+ // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
+ // the server starts from the offset
if (bytesDownloaded > 0) {
params.set(OFFSET, Long.toString(bytesDownloaded));
}
-
+
NamedList response;
InputStream is = null;
-
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
- try {
+
+ // TODO use shardhandler
+ try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null)) {
client.setSoTimeout(60000);
client.setConnectionTimeout(15000);
QueryRequest req = new QueryRequest(params);
@@ -1379,326 +1409,134 @@ public class SnapPuller {
//close stream on error
IOUtils.closeQuietly(is);
throw new IOException("Could not download file '" + fileName + "'", e);
- } finally {
- client.shutdown();
}
}
}
-
- /**
- * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
- *
- * @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
- */
- private class LocalFsFileFetcher {
- boolean includeChecksum = true;
- private File copy2Dir;
+ private class DirectoryFile implements FileInterface {
+ private final String saveAs;
+ private Directory copy2Dir;
+ private IndexOutput outStream;
- String fileName;
+ DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException {
+ this.saveAs = saveAs;
+ this.copy2Dir = tmpIndexDir;
+ outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
+ }
- String saveAs;
+ public void sync() throws IOException {
+ copy2Dir.sync(Collections.singleton(saveAs));
+ }
- long size;
+ public void write(byte[] buf, int packetSize) throws IOException {
+ outStream.writeBytes(buf, 0, packetSize);
+ }
- long bytesDownloaded = 0;
+ public void close() throws Exception {
+ outStream.close();
+ }
- FileChannel fileChannel;
-
- private FileOutputStream fileOutputStream;
+ public void delete() throws Exception {
+ copy2Dir.deleteFile(saveAs);
+ }
+ }
- byte[] buf = new byte[1024 * 1024];
+ private class DirectoryFileFetcher extends FileFetcher {
+ DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+ boolean isConf, long latestGen) throws IOException {
+ super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
+ }
+ }
- Checksum checksum;
+ private class LocalFsFile implements FileInterface {
+ private File copy2Dir;
+ FileChannel fileChannel;
+ private FileOutputStream fileOutputStream;
File file;
- int errorCount = 0;
-
- private boolean isConf;
-
- private boolean aborted = false;
-
- private Long indexGen;
-
- // TODO: could do more code sharing with DirectoryFileFetcher
- LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
- boolean isConf, long latestGen) throws IOException {
+ LocalFsFile(File dir, String saveAs) throws IOException {
this.copy2Dir = dir;
- this.fileName = (String) fileDetails.get(NAME);
- this.size = (Long) fileDetails.get(SIZE);
- this.isConf = isConf;
- this.saveAs = saveAs;
-
- indexGen = latestGen;
this.file = new File(copy2Dir, saveAs);
-
+
File parentDir = this.file.getParentFile();
if( ! parentDir.exists() ){
if ( ! parentDir.mkdirs() ) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Failed to create (sub)directory for file: " + saveAs);
+ "Failed to create (sub)directory for file: " + saveAs);
}
}
-
+
this.fileOutputStream = new FileOutputStream(file);
this.fileChannel = this.fileOutputStream.getChannel();
-
- if (includeChecksum)
- checksum = new Adler32();
}
- /**
- * The main method which downloads file
- */
- void fetchFile() throws Exception {
- try {
- while (true) {
- final FastInputStream is = getStream();
- int result;
- try {
- //fetch packets one by one in a single request
- result = fetchPackets(is);
- if (result == 0 || result == NO_CONTENT) {
- return;
- }
- //if there is an error continue. But continue from the point where it got broken
- } finally {
- IOUtils.closeQuietly(is);
- }
- }
- } finally {
- cleanup();
- //if cleanup suceeds . The file is downloaded fully. do an fsync
- fsyncService.submit(new Runnable(){
- @Override
- public void run() {
- try {
- FileUtils.sync(file);
- } catch (IOException e) {
- fsyncException = e;
- }
- }
- });
- }
- }
-
- private int fetchPackets(FastInputStream fis) throws Exception {
- byte[] intbytes = new byte[4];
- byte[] longbytes = new byte[8];
- try {
- while (true) {
- if (stop) {
- stop = false;
- aborted = true;
- throw new ReplicationHandlerException("User aborted replication");
- }
- long checkSumServer = -1;
- fis.readFully(intbytes);
- //read the size of the packet
- int packetSize = readInt(intbytes);
- if (packetSize <= 0) {
- LOG.warn("No content received for file: " + currentFile);
- return NO_CONTENT;
- }
- if (buf.length < packetSize)
- buf = new byte[packetSize];
- if (checksum != null) {
- //read the checksum
- fis.readFully(longbytes);
- checkSumServer = readLong(longbytes);
- }
- //then read the packet of bytes
- fis.readFully(buf, 0, packetSize);
- //compare the checksum as sent from the master
- if (includeChecksum) {
- checksum.reset();
- checksum.update(buf, 0, packetSize);
- long checkSumClient = checksum.getValue();
- if (checkSumClient != checkSumServer) {
- LOG.error("Checksum not matched between client and server for: " + currentFile);
- //if checksum is wrong it is a problem return for retry
- return 1;
- }
- }
- //if everything is fine, write down the packet to the file
- fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
- bytesDownloaded += packetSize;
- if (bytesDownloaded >= size)
- return 0;
- //errorcount is always set to zero after a successful packet
- errorCount = 0;
- }
- } catch (ReplicationHandlerException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn("Error in fetching packets ", e);
- //for any failure , increment the error count
- errorCount++;
- //if it fails for the same pacaket for MAX_RETRIES fail and come out
- if (errorCount > MAX_RETRIES) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Fetch failed for file:" + fileName, e);
- }
- return ERR;
- }
+ public void sync() throws IOException {
+ FileUtils.sync(file);
}
- /**
- * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
- * other wise it fails. So read everything as bytes and then extract an integer out of it
- */
- private int readInt(byte[] b) {
- return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
- | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
-
+ public void write(byte[] buf, int packetSize) throws IOException {
+ fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
}
- /**
- * Same as above but to read longs from a byte array
- */
- private long readLong(byte[] b) {
- return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
- | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
- | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
- | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
-
+ public void close() throws Exception {
+ //close the FileOutputStream (which also closes the Channel)
+ fileOutputStream.close();
}
- /**
- * cleanup everything
- */
- private void cleanup() {
- try {
- //close the FileOutputStream (which also closes the Channel)
- fileOutputStream.close();
- } catch (Exception e) {/* noop */
- LOG.error("Error closing the file stream: "+ this.saveAs ,e);
- }
- if (bytesDownloaded != size) {
- //if the download is not complete then
- //delete the file being downloaded
- try {
- Files.delete(file.toPath());
- } catch (SecurityException e) {
- LOG.error("Error deleting file in cleanup" + e.getMessage());
- } catch (Throwable other) {
- // TODO: should this class care if a file couldnt be deleted?
- // this just emulates previous behavior, where only SecurityException would be handled.
- }
- //if the failure is due to a user abort it is returned nomally else an exception is thrown
- if (!aborted)
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to download " + fileName + " completely. Downloaded "
- + bytesDownloaded + "!=" + size);
- }
+ public void delete() throws Exception {
+ Files.delete(file.toPath());
}
+ }
- /**
- * Open a new stream using HttpClient
- */
- FastInputStream getStream() throws IOException {
-
- ModifiableSolrParams params = new ModifiableSolrParams();
-
-// //the method is command=filecontent
- params.set(COMMAND, CMD_GET_FILE);
- params.set(GENERATION, Long.toString(indexGen));
- params.set(CommonParams.QT, "/replication");
- //add the version to download. This is used to reserve the download
- if (isConf) {
- //set cf instead of file for config file
- params.set(CONF_FILE_SHORT, fileName);
- } else {
- params.set(FILE, fileName);
- }
- if (useInternal) {
- params.set(COMPRESSION, "true");
- }
- //use checksum
- if (this.includeChecksum) {
- params.set(CHECKSUM, true);
- }
- //wt=filestream this is a custom protocol
- params.set(CommonParams.WT, FILE_STREAM);
- // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
- // the server starts from the offset
- if (bytesDownloaded > 0) {
- params.set(OFFSET, Long.toString(bytesDownloaded));
- }
-
-
- NamedList response;
- InputStream is = null;
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
- try {
- client.setSoTimeout(60000);
- client.setConnectionTimeout(15000);
- QueryRequest req = new QueryRequest(params);
- response = client.request(req);
- is = (InputStream) response.get("stream");
- if(useInternal) {
- is = new InflaterInputStream(is);
- }
- return new FastInputStream(is);
- } catch (Exception e) {
- //close stream on error
- IOUtils.closeQuietly(is);
- throw new IOException("Could not download file '" + fileName + "'", e);
- } finally {
- client.shutdown();
- }
+ private class LocalFsFileFetcher extends FileFetcher {
+ LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
+ boolean isConf, long latestGen) throws IOException {
+ super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
}
}
-
+
NamedList getDetails() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_DETAILS);
params.set("slave", false);
params.set(CommonParams.QT, "/replication");
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient); //XXX use shardhandler
- NamedList rsp;
- try {
+
+ // TODO use shardhandler
+ try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient)) {
client.setSoTimeout(60000);
client.setConnectionTimeout(15000);
QueryRequest request = new QueryRequest(params);
- rsp = client.request(request);
- } finally {
- client.shutdown();
+ return client.request(request);
}
- return rsp;
}
static Integer readInterval(String interval) {
if (interval == null)
return null;
int result = 0;
- if (interval != null) {
- Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
- if (m.find()) {
- String hr = m.group(1);
- String min = m.group(2);
- String sec = m.group(3);
- result = 0;
- try {
- if (sec != null && sec.length() > 0)
- result += Integer.parseInt(sec);
- if (min != null && min.length() > 0)
- result += (60 * Integer.parseInt(min));
- if (hr != null && hr.length() > 0)
- result += (60 * 60 * Integer.parseInt(hr));
- result *= 1000;
- } catch (NumberFormatException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- INTERVAL_ERR_MSG);
- }
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- INTERVAL_ERR_MSG);
+ Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
+ if (m.find()) {
+ String hr = m.group(1);
+ String min = m.group(2);
+ String sec = m.group(3);
+ result = 0;
+ try {
+ if (sec != null && sec.length() > 0)
+ result += Integer.parseInt(sec);
+ if (min != null && min.length() > 0)
+ result += (60 * Integer.parseInt(min));
+ if (hr != null && hr.length() > 0)
+ result += (60 * 60 * Integer.parseInt(hr));
+ result *= 1000;
+ } catch (NumberFormatException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
-
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
+
return result;
}
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/SolrConfigHandler.java Sun Feb 8 23:53:14 2015
@@ -40,6 +40,7 @@ import org.apache.solr.common.util.Conte
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.ConfigOverlay;
import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.PluginsRegistry;
import org.apache.solr.core.RequestParams;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrResourceLoader;
@@ -99,7 +100,7 @@ public class SolrConfigHandler extends R
private void handleGET() {
if(parts.size() == 1) {
- resp.add("solrConfig", req.getCore().getSolrConfig().toMap());
+ resp.add("config", getConfigDetails());
} else {
if(ConfigOverlay.NAME.equals(parts.get(1))){
resp.add(ConfigOverlay.NAME, req.getCore().getSolrConfig().getOverlay().toMap());
@@ -118,12 +119,27 @@ public class SolrConfigHandler extends R
}
} else {
- Map<String, Object> m = req.getCore().getSolrConfig().toMap();
- resp.add("solrConfig", ZkNodeProps.makeMap(parts.get(1),m.get(parts.get(1))));
+ Map<String, Object> m = getConfigDetails();
+ resp.add("config", ZkNodeProps.makeMap(parts.get(1),m.get(parts.get(1))));
}
}
}
+ private Map<String, Object> getConfigDetails() {
+ Map<String, Object> map = req.getCore().getSolrConfig().toMap();
+ Map reqHandlers = (Map) map.get(SolrRequestHandler.TYPE);
+ if(reqHandlers == null) map.put(SolrRequestHandler.TYPE, reqHandlers = new LinkedHashMap<>());
+ List<PluginInfo> plugins = PluginsRegistry.getHandlers(req.getCore());
+ for (PluginInfo plugin : plugins) {
+ if(SolrRequestHandler.TYPE.equals( plugin.type)){
+ if(!reqHandlers.containsKey(plugin.name)){
+ reqHandlers.put(plugin.name,plugin.toMap());
+ }
+ }
+ }
+ return map;
+ }
+
private void handlePOST() throws IOException {
Iterable<ContentStream> streams = req.getContentStreams();
@@ -236,11 +252,16 @@ public class SolrConfigHandler extends R
SolrResourceLoader loader = req.getCore().getResourceLoader();
if (loader instanceof ZkSolrResourceLoader) {
- ZkController.persistConfigResourceToZooKeeper(loader,params.getZnodeVersion(),
- RequestParams.RESOURCE,params.toByteArray(),true);
+ ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader;
+ if(ops.isEmpty()) {
+ ZkController.touchConfDir(zkLoader);
+ }else {
+ ZkController.persistConfigResourceToZooKeeper(zkLoader, params.getZnodeVersion(),
+ RequestParams.RESOURCE, params.toByteArray(), true);
+ }
} else {
- SolrResourceLoader.persistConfLocally(loader, ConfigOverlay.RESOURCE_NAME, params.toByteArray());
+ SolrResourceLoader.persistConfLocally(loader, RequestParams.RESOURCE, params.toByteArray());
req.getCore().getSolrConfig().refreshRequestParams();
}
@@ -278,7 +299,7 @@ public class SolrConfigHandler extends R
SolrResourceLoader loader = req.getCore().getResourceLoader();
if (loader instanceof ZkSolrResourceLoader) {
- ZkController.persistConfigResourceToZooKeeper(loader,overlay.getZnodeVersion(),
+ ZkController.persistConfigResourceToZooKeeper((ZkSolrResourceLoader) loader,overlay.getZnodeVersion(),
ConfigOverlay.RESOURCE_NAME,overlay.toByteArray(),true);
} else {
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Sun Feb 8 23:53:14 2015
@@ -17,66 +17,7 @@ package org.apache.solr.handler.admin;
* limitations under the License.
*/
-import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
-import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
-import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
@@ -118,7 +59,65 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
+import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
public class CollectionsHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
@@ -773,17 +772,15 @@ public class CollectionsHandler extends
ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
- HttpSolrClient server = new HttpSolrClient(nodeProps.getBaseUrl());
- try {
- server.setConnectionTimeout(15000);
- server.setSoTimeout(60000);
+ ;
+ try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
+ client.setConnectionTimeout(15000);
+ client.setSoTimeout(60000);
RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
reqSyncShard.setCollection(collection);
reqSyncShard.setShard(shard);
reqSyncShard.setCoreName(nodeProps.getCoreName());
- server.request(reqSyncShard);
- } finally {
- server.shutdown();
+ client.request(reqSyncShard);
}
}
@@ -853,6 +850,9 @@ public class CollectionsHandler extends
DocCollection.STATE_FORMAT,
AUTO_ADD_REPLICAS,
"router.");
+ if(props.get(DocCollection.STATE_FORMAT) == null){
+ props.put(DocCollection.STATE_FORMAT,"2");
+ }
if(SYSTEM_COLL.equals(name)){
//We must always create asystem collection with only a single shard
Modified: lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1658277&r1=1658276&r2=1658277&view=diff
==============================================================================
--- lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/lucene6005/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Sun Feb 8 23:53:14 2015
@@ -17,25 +17,8 @@
package org.apache.solr.handler.admin;
-import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.MatchAllDocsQuery;
@@ -67,7 +50,6 @@ import org.apache.solr.core.CoreDescript
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrXMLCoresLocator;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -86,8 +68,24 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
/**
*
@@ -586,12 +584,7 @@ public class CoreAdminHandler extends Re
// only write out the descriptor if the core is successfully created
coreContainer.getCoresLocator().create(coreContainer, dcore);
-
- if (coreContainer.getCoresLocator() instanceof SolrXMLCoresLocator) {
- // hack - in this case we persist once more because a core create race might
- // have dropped entries.
- coreContainer.getCoresLocator().create(coreContainer);
- }
+
rsp.add("core", core.getName());
}
catch (Exception ex) {
@@ -694,7 +687,6 @@ public class CoreAdminHandler extends Re
}
try {
if (cname == null) {
- rsp.add("defaultCoreName", coreContainer.getDefaultCoreName());
for (String name : coreContainer.getAllCoreNames()) {
status.add(name, getCoreStatus(coreContainer, name, isIndexInfoNeeded));
}
@@ -1111,7 +1103,6 @@ public class CoreAdminHandler extends Re
CoreDescriptor desc = cores.getUnloadedCoreDescriptor(cname);
if (desc != null) {
info.add("name", desc.getName());
- info.add("isDefaultCore", desc.getName().equals(cores.getDefaultCoreName()));
info.add("instanceDir", desc.getInstanceDir());
// None of the following are guaranteed to be present in a not-yet-loaded core.
String tmp = desc.getDataDir();
@@ -1126,7 +1117,6 @@ public class CoreAdminHandler extends Re
try (SolrCore core = cores.getCore(cname)) {
if (core != null) {
info.add("name", core.getName());
- info.add("isDefaultCore", core.getName().equals(cores.getDefaultCoreName()));
info.add("instanceDir", normalizePath(core.getResourceLoader().getInstanceDir()));
info.add("dataDir", normalizePath(core.getDataDir()));
info.add("config", core.getConfigResource());