You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/01/24 01:55:54 UTC
svn commit: r1560860 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/handler/loader/
solr/core/src/test/org/apache/solr/cloud/ solr/solrj/
solr/solrj/src/java/org/apache/solr/client/solrj/request/ solr/solrj/...
Author: markrmiller
Date: Fri Jan 24 00:55:53 2014
New Revision: 1560860
URL: http://svn.apache.org/r1560860
Log:
SOLR-5658: commitWithin and overwrite are not being distributed to replicas now that SolrCloud uses javabin to distribute updates.
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Jan 24 00:55:53 2014
@@ -370,6 +370,10 @@ Bug Fixes
* SOLR-5636: SolrRequestParsers does some xpath lookups on every request, which
can cause concurrency issues. (Mark Miller)
+* SOLR-5658: commitWithin and overwrite are not being distributed to replicas
+ now that SolrCloud uses javabin to distribute updates.
+ (Mark Miller, Varun Thacker, Elodie Sannier, shalin)
+
Optimizations
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Fri Jan 24 00:55:53 2014
@@ -70,7 +70,7 @@ public class JavabinLoader extends Conte
private AddUpdateCommand addCmd = null;
@Override
- public void update(SolrInputDocument document, UpdateRequest updateRequest) {
+ public void update(SolrInputDocument document, UpdateRequest updateRequest, Integer commitWithin, Boolean overwrite) {
if (document == null) {
// Perhaps commit from the parameters
try {
@@ -85,6 +85,13 @@ public class JavabinLoader extends Conte
addCmd = getAddCommand(req, updateRequest.getParams());
}
addCmd.solrDoc = document;
+ if (commitWithin != null) {
+ addCmd.commitWithin = commitWithin;
+ }
+ if (overwrite != null) {
+ addCmd.overwrite = overwrite;
+ }
+
try {
processor.processAdd(addCmd);
addCmd.clear();
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Fri Jan 24 00:55:53 2014
@@ -21,12 +21,12 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@@ -336,6 +336,24 @@ public class BasicDistributedZkTest exte
query(false, new Object[] {"q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.RESULTS});
query(false, new Object[] {"q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY});
+ // try commitWithin
+ long before = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("commitWithin", 10);
+ add(cloudClient, params , getDoc("id", 300));
+
+ long timeout = System.currentTimeMillis() + 15000;
+ while (cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() != before + 1) {
+ if (timeout <= System.currentTimeMillis()) {
+ fail("commitWithin did not work");
+ }
+ Thread.sleep(100);
+ }
+
+ for (SolrServer client : clients) {
+ assertEquals("commitWithin did not work", before + 1, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+ }
+
// TODO: This test currently fails because debug info is obtained only
// on shards with matches.
// query("q","matchesnothing","fl","*,score", "debugQuery", "true");
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Fri Jan 24 00:55:53 2014
@@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -60,9 +59,6 @@ public class JavaBinUpdateRequestCodec {
}
Iterator<SolrInputDocument> docIter = null;
- if (updateRequest.getDocuments() != null) {
- docIter = updateRequest.getDocuments().iterator();
- }
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
@@ -70,10 +66,19 @@ public class JavaBinUpdateRequestCodec {
Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params
- nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
+ if (updateRequest.getDeleteByIdMap() != null) {
+ nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
+ }
nl.add("delByQ", updateRequest.getDeleteQuery());
- nl.add("docs", docIter);
- nl.add("docsMap", docMap);
+
+ if (docMap != null) {
+ nl.add("docsMap", docMap.entrySet().iterator());
+ } else {
+ if (updateRequest.getDocuments() != null) {
+ docIter = updateRequest.getDocuments().iterator();
+ }
+ nl.add("docs", docIter);
+ }
JavaBinCodec codec = new JavaBinCodec();
codec.marshal(nl, os);
}
@@ -92,7 +97,7 @@ public class JavaBinUpdateRequestCodec {
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist;
- Map<SolrInputDocument,Map<String,Object>> docMap;
+ List<Entry<SolrInputDocument,Map<Object,Object>>> docMap;
List<String> delById;
Map<String,Map<String,Object>> delByIdMap;
List<String> delByQ;
@@ -132,9 +137,11 @@ public class JavaBinUpdateRequestCodec {
}
private List readOuterMostDocIterator(DataInputInputStream fis) throws IOException {
- NamedList params = (NamedList) namedList[0].getVal(0);
+ NamedList params = (NamedList) namedList[0].get("params");
updateRequest.setParams(new ModifiableSolrParams(SolrParams.toSolrParams(params)));
if (handler == null) return super.readIterator(fis);
+ Integer commitWithin = null;
+ Boolean overwrite = null;
while (true) {
Object o = readVal(fis);
if (o == END_OBJ) break;
@@ -144,16 +151,24 @@ public class JavaBinUpdateRequestCodec {
} else if (o instanceof NamedList) {
UpdateRequest req = new UpdateRequest();
req.setParams(new ModifiableSolrParams(SolrParams.toSolrParams((NamedList) o)));
- handler.update(null, req);
+ handler.update(null, req, null, null);
+ } else if (o instanceof Map.Entry){
+ sdoc = (SolrInputDocument) ((Map.Entry) o).getKey();
+ Map p = (Map) ((Map.Entry) o).getValue();
+ if (p != null) {
+ commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN);
+ overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE);
+ }
} else {
+
sdoc = (SolrInputDocument) o;
}
- handler.update(sdoc, updateRequest);
+ handler.update(sdoc, updateRequest, commitWithin, overwrite);
}
return Collections.EMPTY_LIST;
}
- };
+ };
codec.unmarshal(is);
@@ -161,6 +176,7 @@ public class JavaBinUpdateRequestCodec {
// must be loaded now
if(updateRequest.getParams()==null) {
NamedList params = (NamedList) namedList[0].get("params");
+ System.out.println("unmarchal params:" + params);
if(params!=null) {
updateRequest.setParams(new ModifiableSolrParams(SolrParams.toSolrParams(params)));
}
@@ -169,32 +185,12 @@ public class JavaBinUpdateRequestCodec {
delByIdMap = (Map<String,Map<String,Object>>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs");
- docMap = (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
+ docMap = (List<Entry<SolrInputDocument,Map<Object,Object>>>) namedList[0].get("docsMap");
+
+
+ // we don't add any docs, because they were already processed
+ // deletes are handled later, and must be passed back on the UpdateRequest
- if (doclist != null && !doclist.isEmpty()) {
- List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
- for (Object o : doclist) {
- if (o instanceof List) {
- solrInputDocs.add(listToSolrInputDocument((List<NamedList>)o));
- } else {
- solrInputDocs.add((SolrInputDocument)o);
- }
- }
- updateRequest.add(solrInputDocs);
- }
- if (docMap != null && !docMap.isEmpty()) {
- Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
- for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
- Map<String,Object> map = entry.getValue();
- Boolean overwrite = null;
- Integer commitWithin = null;
- if (map != null) {
- overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
- commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
- }
- updateRequest.add(entry.getKey(), commitWithin, overwrite);
- }
- }
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
@@ -204,7 +200,7 @@ public class JavaBinUpdateRequestCodec {
for (Map.Entry<String,Map<String,Object>> entry : delByIdMap.entrySet()) {
Map<String,Object> params = entry.getValue();
if (params != null) {
- Long version = (Long) params.get("ver");
+ Long version = (Long) params.get(UpdateRequest.VER);
updateRequest.deleteById(entry.getKey(), version);
} else {
updateRequest.deleteById(entry.getKey());
@@ -217,8 +213,8 @@ public class JavaBinUpdateRequestCodec {
updateRequest.deleteByQuery(s);
}
}
+
return updateRequest;
-
}
private SolrInputDocument listToSolrInputDocument(List<NamedList> namedList) {
@@ -242,6 +238,6 @@ public class JavaBinUpdateRequestCodec {
}
public static interface StreamingUpdateHandler {
- public void update(SolrInputDocument document, UpdateRequest req);
+ public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean override);
}
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Fri Jan 24 00:55:53 2014
@@ -47,7 +47,7 @@ import org.apache.solr.common.util.XML;
*/
public class UpdateRequest extends AbstractUpdateRequest {
- private static final String VER = "ver";
+ public static final String VER = "ver";
public static final String OVERWRITE = "ow";
public static final String COMMIT_WITHIN = "cw";
private Map<SolrInputDocument,Map<String,Object>> documents = null;
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java Fri Jan 24 00:55:53 2014
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
+import java.util.Map.Entry;
import java.nio.ByteBuffer;
/**
@@ -65,6 +66,7 @@ public class JavaBinCodec {
SOLRINPUTDOC = 16,
SOLRINPUTDOC_CHILDS = 17,
ENUM_FIELD_VALUE = 18,
+ MAP_ENTRY = 19,
// types that combine tag + length (or other info) in a single byte
TAG_AND_LEN = (byte) (1 << 5),
STR = (byte) (1 << 5),
@@ -227,6 +229,8 @@ public class JavaBinCodec {
return readSolrInputDocument(dis);
case ENUM_FIELD_VALUE:
return readEnumFieldValue(dis);
+ case MAP_ENTRY:
+ return readMapEntry(dis);
}
throw new RuntimeException("Unknown type " + tagByte);
@@ -286,6 +290,10 @@ public class JavaBinCodec {
writeEnumFieldValue((EnumFieldValue) val);
return true;
}
+ if (val instanceof Map.Entry) {
+ writeMapEntry((Map.Entry)val);
+ return true;
+ }
return false;
}
@@ -480,6 +488,12 @@ public class JavaBinCodec {
writeInt(enumFieldValue.toInt());
writeStr(enumFieldValue.toString());
}
+
+ public void writeMapEntry(Entry<Object,Object> val) throws IOException {
+ writeTag(MAP_ENTRY);
+ writeVal(val.getKey());
+ writeVal(val.getValue());
+ }
/**
* read {@link EnumFieldValue} (int+string) from input stream
@@ -491,6 +505,33 @@ public class JavaBinCodec {
String stringValue = (String) readVal(dis);
return new EnumFieldValue(intValue, stringValue);
}
+
+
+ public Map.Entry<Object,Object> readMapEntry(DataInputInputStream dis) throws IOException {
+ final Object key = readVal(dis);
+ final Object value = readVal(dis);
+ return new Map.Entry<Object,Object>() {
+
+ @Override
+ public Object getKey() {
+ return key;
+ }
+
+ @Override
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "MapEntry[" + key.toString() + ":" + value.toString() + "]";
+ }
+
+ @Override
+ public Object setValue(Object value) {
+ throw new UnsupportedOperationException();
+ }};
+ }
/**
* write the string as tag+length, with length being the number of UTF-8 bytes
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java?rev=1560860&r1=1560859&r2=1560860&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java Fri Jan 24 00:55:53 2014
@@ -85,7 +85,7 @@ public class TestUpdateRequestCodec exte
final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
@Override
- public void update(SolrInputDocument document, UpdateRequest req) {
+ public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean overwrite) {
Assert.assertNotNull(req.getParams());
docs.add(document);
}
@@ -136,7 +136,7 @@ public class TestUpdateRequestCodec exte
final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
@Override
- public void update(SolrInputDocument document, UpdateRequest req) {
+ public void update(SolrInputDocument document, UpdateRequest req, Integer commitWithin, Boolean overwrite) {
Assert.assertNotNull(req.getParams());
docs.add(document);
}