You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/01/11 18:28:49 UTC
svn commit: r1230162 -
/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java
Author: yonik
Date: Wed Jan 11 17:28:48 2012
New Revision: 1230162
URL: http://svn.apache.org/viewvc?rev=1230162&view=rev
Log:
apply oldest updates first
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1230162&r1=1230161&r2=1230162&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java Wed Jan 11 17:28:48 2012
@@ -264,70 +264,71 @@ public class PeerSync {
UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
- Object obj = null;
+ Object o = null;
try {
- for (Object o : updates) {
- obj = o; // for error reporting
+ // Apply oldest updates first
+ for (int i=updates.size()-1; i>=0; i--) {
+ o = updates.get(i);
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+
+ int oper = (Integer)entry.get(0);
+ long version = (Long) entry.get(1);
+
+ switch (oper) {
+ case UpdateLog.ADD:
+ {
+ // byte[] idBytes = (byte[]) entry.get(2);
+ SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processAdd(cmd);
+ break;
+ }
+ case UpdateLog.DELETE:
+ {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY:
+ {
+ String query = (String)entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
+ proc.processDelete(cmd);
+ break;
+ }
- // should currently be a List<Oper,Ver,Doc/Id>
- List entry = (List)o;
-
- int oper = (Integer)entry.get(0);
- long version = (Long) entry.get(1);
-
- switch (oper) {
- case UpdateLog.ADD:
- {
- // byte[] idBytes = (byte[]) entry.get(2);
- SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- // cmd.setIndexedId(new BytesRef(idBytes));
- cmd.solrDoc = sdoc;
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
- proc.processAdd(cmd);
- break;
- }
- case UpdateLog.DELETE:
- {
- byte[] idBytes = (byte[]) entry.get(2);
- DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
- cmd.setIndexedId(new BytesRef(idBytes));
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
- proc.processDelete(cmd);
- break;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
- case UpdateLog.DELETE_BY_QUERY:
- {
- String query = (String)entry.get(2);
- DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
- cmd.query = query;
- cmd.setVersion(version);
- cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
- proc.processDelete(cmd);
- break;
- }
-
- default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
}
-
- }
catch (IOException e) {
// TODO: should this be handled separately as a problem with us?
// I guess it probably already will by causing replication to be kicked off.
sreq.updateException = e;
- log.error("Error applying updates from " + sreq.shards + " ,update=" + obj, e);
+ log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
return false;
}
catch (Exception e) {
sreq.updateException = e;
- log.error("Error applying updates from " + sreq.shards + " ,update=" + obj, e);
+ log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
return false;
}