You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/02/13 07:24:10 UTC
[lucene-solr] branch master updated: SOLR-13171 : A true streaming
parser for javabin payload/stream without creating any objects
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new cfefdad SOLR-13171 : A true streaming parser for javabin payload/stream without creating any objects
cfefdad is described below
commit cfefdade5f7482250afffe0c3237d18a427eb281
Author: Noble Paul <no...@apache.org>
AuthorDate: Wed Feb 13 18:23:43 2019 +1100
SOLR-13171 : A true streaming parser for javabin payload/stream without creating any objects
---
solr/CHANGES.txt | 1 +
.../apache/solr/response/BinaryResponseWriter.java | 9 +
.../client/solrj/FastStreamingDocsCallback.java | 66 ++
.../org/apache/solr/client/solrj/SolrClient.java | 14 +-
.../solrj/impl/StreamingBinaryResponseParser.java | 169 +++--
.../java/org/apache/solr/common/SolrDocument.java | 1 +
.../org/apache/solr/common/util/DataEntry.java | 181 +++++
.../solr/common/util/DataInputInputStream.java | 14 +
.../apache/solr/common/util/FastInputStream.java | 7 +
.../solr/common/util/FastJavaBinDecoder.java | 829 +++++++++++++++++++++
.../org/apache/solr/common/util/JavaBinCodec.java | 94 +--
.../org/apache/solr/common/util/StringBytes.java | 78 ++
.../apache/solr/common/util/Utf8CharSequence.java | 4 +-
.../java/org/apache/solr/common/util/Utils.java | 1 +
solr/solrj/src/test-files/solrj/javabin_sample.bin | Bin 0 -> 868 bytes
.../solr/common/util/TestFastJavabinDecoder.java | 302 ++++++++
.../apache/solr/common/util/TestJavaBinCodec.java | 16 +-
.../solr/common/util/Utf8CharSequenceTest.java | 2 +-
18 files changed, 1663 insertions(+), 125 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 70a030c..460daae 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -52,6 +52,7 @@ Upgrade Notes
New Features
----------------------
+* SOLR-13171 : A true streaming parser for javabin payload/stream without creating any objects (noble)
Bug Fixes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/response/BinaryResponseWriter.java b/solr/core/src/java/org/apache/solr/response/BinaryResponseWriter.java
index be6317b..4b1fdc5 100644
--- a/solr/core/src/java/org/apache/solr/response/BinaryResponseWriter.java
+++ b/solr/core/src/java/org/apache/solr/response/BinaryResponseWriter.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.response;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -60,6 +61,14 @@ public class BinaryResponseWriter implements BinaryQueryResponseWriter {
}
}
+ private static void serialize(SolrQueryResponse response,Resolver resolver, String f) throws IOException {
+ try (JavaBinCodec jbc = new JavaBinCodec(resolver); FileOutputStream fos = new FileOutputStream(f)) {
+ jbc.setWritableDocFields(resolver).marshal(response.getValues(), fos);
+ fos.flush();
+ }
+
+ }
+
@Override
public void write(Writer writer, SolrQueryRequest request, SolrQueryResponse response) throws IOException {
throw new RuntimeException("This is a binary writer , Cannot write to a characterstream");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java b/solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java
new file mode 100644
index 0000000..57701ff
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/FastStreamingDocsCallback.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj;
+
+
+import org.apache.solr.common.util.DataEntry;
+
+public interface FastStreamingDocsCallback {
+ /** callback for a doclist
+ *
+ * @return the object to be shared with all the {{@link #startDoc(Object)}} calls. return null if nothing needs to be shared
+ */
+ default Object initDocList(Long numFound, Long start, Float maxScore) {
+ return null;
+ }
+
+
+ /**
+ * Started a document
+ *
+ * @param docListObj This object is the value returned by the {{@link #initDocList(Long, Long, Float)}} method
+ * @return any arbitrary object that should be shared between each field
+ */
+ Object startDoc(Object docListObj);
+
+ /**
+ * FOund a new field
+ *
+ * @param field Read the appropriate value
+ * @param docObj The object returned by {{@link #startDoc(Object)}} method
+ */
+ void field(DataEntry field, Object docObj);
+
+ /**
+ * A document ends
+ *
+ * @param docObj The object returned by {{@link #startDoc(Object)}} method
+ */
+ default void endDoc(Object docObj) { }
+
+ /** A new child doc starts
+ * @param parentDocObj an objec that will be shared across all the {{@link FastStreamingDocsCallback#field(DataEntry, Object)}}
+ * @return any custom object that be shared with the fields in this child doc
+ */
+ default Object startChildDoc(Object parentDocObj) {
+ return null;
+ }
+
+
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
index 8aebea0..885edc9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
@@ -1058,9 +1058,19 @@ public abstract class SolrClient implements Serializable, Closeable {
*/
public QueryResponse queryAndStreamResponse(String collection, SolrParams params, StreamingResponseCallback callback)
throws SolrServerException, IOException {
- ResponseParser parser = new StreamingBinaryResponseParser(callback);
+ return getQueryResponse(collection, params, new StreamingBinaryResponseParser(callback));
+ }
+
+ public QueryResponse queryAndStreamResponse(String collection, SolrParams params, FastStreamingDocsCallback callback)
+ throws SolrServerException, IOException {
+ return getQueryResponse(collection, params, new StreamingBinaryResponseParser(callback));
+ }
+
+ private QueryResponse getQueryResponse(String collection, SolrParams params, ResponseParser parser) throws SolrServerException, IOException {
QueryRequest req = new QueryRequest(params);
- req.setStreamingResponseCallback(callback);
+ if (parser instanceof StreamingBinaryResponseParser) {
+ req.setStreamingResponseCallback(((StreamingBinaryResponseParser) parser).callback);
+ }
req.setResponseParser(parser);
return req.process(this, collection);
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java
index b70daee..5c41f6b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingBinaryResponseParser.java
@@ -20,11 +20,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import org.apache.solr.client.solrj.FastStreamingDocsCallback;
import org.apache.solr.client.solrj.StreamingResponseCallback;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.DataEntry;
+import org.apache.solr.common.util.DataEntry.EntryListener;
import org.apache.solr.common.util.DataInputInputStream;
+import org.apache.solr.common.util.FastJavaBinDecoder;
+import org.apache.solr.common.util.FastJavaBinDecoder.EntryImpl;
+import org.apache.solr.common.util.FastJavaBinDecoder.Tag;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
@@ -36,64 +42,133 @@ import org.apache.solr.common.util.NamedList;
* @since solr 4.0
*/
public class StreamingBinaryResponseParser extends BinaryResponseParser {
- final StreamingResponseCallback callback;
-
- public StreamingBinaryResponseParser( StreamingResponseCallback cb )
- {
+ public final StreamingResponseCallback callback;
+ public final FastStreamingDocsCallback fastCallback;
+
+ public StreamingBinaryResponseParser(StreamingResponseCallback cb) {
this.callback = cb;
+ fastCallback = null;
+ }
+
+ public StreamingBinaryResponseParser(FastStreamingDocsCallback cb) {
+ this.fastCallback = cb;
+ this.callback = null;
+
}
@Override
public NamedList<Object> processResponse(InputStream body, String encoding) {
- try (JavaBinCodec codec = new JavaBinCodec() {
+ if (callback != null) {
+ return streamDocs(body);
+ } else {
+ try {
+ return fastStreamDocs(body, fastCallback);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse", e);
+ }
+ }
- private int nestedLevel;
-
- @Override
- public SolrDocument readSolrDocument(DataInputInputStream dis) throws IOException {
- nestedLevel++;
- SolrDocument doc = super.readSolrDocument(dis);
- nestedLevel--;
- if (nestedLevel == 0) {
- // parent document
- callback.streamSolrDocument(doc);
- return null;
- } else {
- // child document
- return doc;
- }
+ }
+
+ private NamedList<Object> fastStreamDocs(InputStream body, FastStreamingDocsCallback fastCallback) throws IOException {
+
+ fieldListener = new EntryListener() {
+ @Override
+ public void entry(DataEntry field) {
+ if (((EntryImpl) field).getTag() == Tag._SOLRDOC) {
+ field.listenContainer(fastCallback.startChildDoc(field.ctx()), fieldListener);
+ } else {
+ fastCallback.field(field, field.ctx());
}
+ }
- @Override
- public SolrDocumentList readSolrDocumentList(DataInputInputStream dis) throws IOException {
- SolrDocumentList solrDocs = new SolrDocumentList();
- List list = (List) readVal(dis);
- solrDocs.setNumFound((Long) list.get(0));
- solrDocs.setStart((Long) list.get(1));
- solrDocs.setMaxScore((Float) list.get(2));
-
- callback.streamDocListInfo(
- solrDocs.getNumFound(),
- solrDocs.getStart(),
- solrDocs.getMaxScore() );
-
- // Read the Array
- tagByte = dis.readByte();
- if( (tagByte >>> 5) != (ARR >>> 5) ) {
- throw new RuntimeException( "doclist must have an array" );
- }
- int sz = readSize(dis);
- for (int i = 0; i < sz; i++) {
- // must be a SolrDocument
- readVal( dis );
+ @Override
+ public void end(DataEntry e) {
+ fastCallback.endDoc(((EntryImpl) e).ctx);
+ }
+ };
+ docListener = e -> {
+ EntryImpl entry = (EntryImpl) e;
+ if (entry.getTag() == Tag._SOLRDOC) {//this is a doc
+ entry.listenContainer(fastCallback.startDoc(entry.ctx()), fieldListener);
+ }
+ };
+ new FastJavaBinDecoder()
+ .withInputStream(body)
+ .decode(new EntryListener() {
+ @Override
+ public void entry(DataEntry e) {
+ EntryImpl entry = (EntryImpl) e;
+ if( !entry.type().isContainer) return;
+ if (e.isKeyValEntry() && entry.getTag() == Tag._SOLRDOCLST) {
+ List l = (List) e.metadata();
+ e.listenContainer(fastCallback.initDocList(
+ (Long) l.get(0),
+ (Long) l.get(1),
+ (Float) l.get(2)),
+ docListener);
+ } else {
+ e.listenContainer(null, this);
+ }
}
- return solrDocs;
+ });
+ return null;
+ }
+
+
+ private EntryListener fieldListener;
+ private EntryListener docListener;
+
+
+ private NamedList<Object> streamDocs(InputStream body) {
+ try (JavaBinCodec codec = new JavaBinCodec() {
+
+ private int nestedLevel;
+
+ @Override
+ public SolrDocument readSolrDocument(DataInputInputStream dis) throws IOException {
+ nestedLevel++;
+ SolrDocument doc = super.readSolrDocument(dis);
+ nestedLevel--;
+ if (nestedLevel == 0) {
+ // parent document
+ callback.streamSolrDocument(doc);
+ return null;
+ } else {
+ // child document
+ return doc;
}
- };) {
-
+ }
+
+ @Override
+ public SolrDocumentList readSolrDocumentList(DataInputInputStream dis) throws IOException {
+ SolrDocumentList solrDocs = new SolrDocumentList();
+ List list = (List) readVal(dis);
+ solrDocs.setNumFound((Long) list.get(0));
+ solrDocs.setStart((Long) list.get(1));
+ solrDocs.setMaxScore((Float) list.get(2));
+
+ callback.streamDocListInfo(
+ solrDocs.getNumFound(),
+ solrDocs.getStart(),
+ solrDocs.getMaxScore());
+
+ // Read the Array
+ tagByte = dis.readByte();
+ if ((tagByte >>> 5) != (ARR >>> 5)) {
+ throw new RuntimeException("doclist must have an array");
+ }
+ int sz = readSize(dis);
+ for (int i = 0; i < sz; i++) {
+ // must be a SolrDocument
+ readVal(dis);
+ }
+ return solrDocs;
+ }
+ };) {
+
return (NamedList<Object>) codec.unmarshal(body);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "parsing error", e);
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
index 0563c9e..fd56d9d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
+++ b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
@@ -409,6 +409,7 @@ public class SolrDocument extends SolrDocumentBase<Object, SolrDocument> impleme
@Override
public int getChildDocumentCount() {
+ if (_childDocuments == null) return 0;
return _childDocuments.size();
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java b/solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java
new file mode 100644
index 0000000..04b3a2d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/DataEntry.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This represents a data entry in the payload/stream. There are multiple ways to consume the data entry
+ * a) listen to it, if it's a container object, and get callbacks for each sub-entry
+ * b) read as an object using the {{@link #val()}} method. Please note that it creates objects and expect more memory usage
+ * c) read the corresponding primitive value
+ * Do not keep a reference of this Object beyond the scope where it is called. Read the relevant data out.
+ */
+public interface DataEntry {
+ /**
+ * The data type
+ */
+ DataEntry.Type type();
+
+ /**
+ * The index of this entry in the container
+ */
+ long index();
+
+ int intVal();
+
+ long longVal();
+
+ float floatVal();
+
+ double doubleVal();
+
+ boolean boolVal();
+
+ default String strValue() {
+ if (type() == null) return null;
+ return val().toString();
+ }
+
+ /**
+ * The object value
+ */
+ Object val();
+
+ /**
+ * Register a listener to get callbacks for all entries
+ *
+ * @param ctx This is any object that should be shared with the child entry callbacks
+ * @param listener The listener that handles each entry in this container
+ */
+ void listenContainer(Object ctx, EntryListener listener);
+
+ /**
+ * Some Objects may have metadata. usually there is none
+ */
+
+ Object metadata();
+
+ /**Depth of this Object. The root most object has a depth of 1
+ */
+ int depth();
+
+ /**
+ * If this is a child of another container object this returns a non-null value
+ *
+ * @return the parent container object
+ */
+ DataEntry parent();
+
+ /**
+ * This is the object shared in the parent container in the {{@link #listenContainer(Object, EntryListener)}} method
+ */
+ Object ctx();
+
+ /**
+ * If it is a non-primitive type type and size is known in advance
+ *
+ * if it's a map/list, it's the no:of items in this container
+ *
+ * if it's a {{@link CharSequence}} or byte[] , it's the no:of bytes in the stream
+ *
+ * @return a number greater than or equal to zero if the size is known, -1 if unknown
+ */
+ int length();
+
+ /**
+ * If this object is a key value entry. key value entries have name
+ */
+ boolean isKeyValEntry();
+
+ /**
+ * The name, if this is a map entry , else it returns a null
+ */
+ CharSequence name();
+
+ /**
+ * The types are a superset of json
+ */
+ enum Type {
+ NULL(true),
+ LONG(true),
+ INT(true),
+ BOOL(true),
+ FLOAT(true),
+ DOUBLE(true),
+ DATE(true),
+ /**
+ * A map like json object
+ */
+ KEYVAL_ITER(false, true),
+ /**
+ * An array like json object
+ */
+ ENTRY_ITER(false, true),
+ STR(false),
+ BYTEARR(false),
+ /**
+ * don't know how to stream it. read as an object using {{@link DataEntry#val()}} method
+ */
+ JAVA_OBJ(false);
+ /**
+ * A primitive type which usually maps to a java primitive
+ */
+ public final boolean isPrimitive;
+
+ public final boolean isContainer;
+
+ Type(boolean isPrimitive) {
+ this(isPrimitive, false);
+ }
+
+ Type(boolean isPrimitive, boolean isContainer) {
+ this.isPrimitive = isPrimitive;
+ this.isContainer = isContainer;
+ }
+ }
+
+ interface EntryListener {
+
+ /**
+ * Callback for each entry in this container. once the method call returns, the entry object is not valid anymore
+ * It is usually reused.
+ * If the object value is a {{@link Utf8CharSequence}} do a {{@link Object#clone()}} because the object may be reused
+ *
+ * @param e The entry in the container
+ */
+ void entry(DataEntry e);
+
+ /**
+ * Callback after all entries of this container are streamed
+ *
+ * @param e the container entry
+ */
+ default void end(DataEntry e) {
+ }
+ }
+
+ interface FastDecoder {
+
+ FastDecoder withInputStream(InputStream is);
+
+ Object decode(EntryListener iterListener) throws IOException;
+
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/DataInputInputStream.java b/solr/solrj/src/java/org/apache/solr/common/util/DataInputInputStream.java
index b8a07be..222bebd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/DataInputInputStream.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/DataInputInputStream.java
@@ -18,9 +18,23 @@ package org.apache.solr.common.util;
import java.io.DataInput;
import java.io.InputStream;
+import java.nio.ByteBuffer;
/**
* An abstract DataInput that extends InputStream
*/
public abstract class DataInputInputStream extends InputStream implements DataInput {
+
+ /**If possible, read UTF8 bytes directly from the underlying buffer
+ *
+ * @param utf8 the utf8 ubject to read into
+ * @param len length of the utf8 stream
+ * @return whether it is possible to do a direct read or not
+ */
+ boolean readDirectUtf8(ByteArrayUtf8CharSequence utf8, int len){return false;}
+
+ /**If possible, read ByteBuffer directly from the underlying buffer
+ * @param sz the size of the buffer to be read
+ */
+ public ByteBuffer readDirectByteBuffer(int sz){return null;};
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java b/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
index bbcc129..f7d633d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/FastInputStream.java
@@ -44,6 +44,13 @@ public class FastInputStream extends DataInputInputStream {
this.end = end;
}
+ @Override
+ boolean readDirectUtf8(ByteArrayUtf8CharSequence utf8, int len) {
+ if (in != null || end < pos + len) return false;
+ utf8.reset(buf, pos, len, null);
+ pos = pos + len;
+ return true;
+ }
public static FastInputStream wrap(InputStream in) {
return (in instanceof FastInputStream) ? (FastInputStream)in : new FastInputStream(in);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java b/solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java
new file mode 100644
index 0000000..2ded345
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/FastJavaBinDecoder.java
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.DataEntry.EntryListener;
+
+import static org.apache.solr.common.util.FastJavaBinDecoder.Tag._EXTERN_STRING;
+import static org.apache.solr.common.util.JavaBinCodec.*;
+
+public class FastJavaBinDecoder implements DataEntry.FastDecoder {
+ private StreamCodec codec;
+ private EntryImpl rootEntry = new EntryImpl();
+ private InputStream stream;
+
+ private static final DataEntry.EntryListener emptylistener = e -> {
+ };
+
+
+ @Override
+ public FastJavaBinDecoder withInputStream(InputStream is) {
+ this.stream = is;
+ return this;
+ }
+
+ @Override
+ public Object decode(EntryListener listener) throws IOException {
+ rootEntry.entryListener = listener == null ? emptylistener : listener;
+ codec = new StreamCodec(stream);
+ codec.start();
+ EntryImpl entry = codec.beginRead(rootEntry);
+ listener.entry(entry);
+ if (entry.tag.type.isContainer && entry.entryListener != null) {
+ entry.tag.stream(entry, codec);
+ }
+ return entry.ctx;
+ }
+
+
+ static class StreamCodec extends JavaBinCodec {
+
+ final FastInputStream dis;
+
+ StreamCodec(InputStream is) {
+ this.dis = FastInputStream.wrap(is);
+ }
+
+
+ public void skip(int sz) throws IOException {
+ while (sz > 0) {
+ int read = dis.read(bytes, 0, Math.min(bytes.length, sz));
+ sz -= read;
+ }
+
+ }
+
+
+ void start() throws IOException {
+ _init(dis);
+ }
+
+
+ Tag getTag() throws IOException {
+ tagByte = dis.readByte();
+ switch (tagByte >>> 5) {
+ case STR >>> 5:
+ return Tag._STR;
+ case SINT >>> 5:
+ return Tag._SINT;
+ case SLONG >>> 5:
+ return Tag._SLONG;
+ case ARR >>> 5:
+ return Tag._ARR;
+ case ORDERED_MAP >>> 5:
+ return Tag._ORDERED_MAP;
+ case NAMED_LST >>> 5:
+ return Tag._NAMED_LST;
+ case EXTERN_STRING >>> 5:
+ return _EXTERN_STRING;
+ }
+
+ Tag t = lower5BitTags[tagByte];
+ if (t == null) throw new RuntimeException("Invalid type " + tagByte);
+ return t;
+ }
+
+ public ByteBuffer readByteBuffer(DataInputInputStream dis, int sz) throws IOException {
+ ByteBuffer result = dis.readDirectByteBuffer(sz);
+ if(result != null) return result;
+ byte[] arr = new byte[readVInt(dis)];
+ dis.readFully(arr);
+ return ByteBuffer.wrap(arr);
+ }
+
+ public CharSequence readObjKey(Tag ktag) throws IOException {
+ CharSequence key = null;
+ if (ktag.type == DataEntry.Type.STR) {
+ if (ktag == _EXTERN_STRING) key = readExternString(dis);
+ else key = readStr(dis);
+ } else if (ktag.type == DataEntry.Type.NULL) {
+ //no need to do anything
+ } else {
+ throw new RuntimeException("Key must be String");
+ }
+ return key;
+ }
+
+ public EntryImpl beginRead(EntryImpl parent) throws IOException {
+ EntryImpl entry = parent.getChildAndReset();
+ entry.tag = getTag();
+ entry.tag.lazyRead(entry, this);
+ if (entry.tag.type.isPrimitive) entry.consumedFully = true;
+ return entry;
+ }
+ }
+
+
+ public class EntryImpl implements DataEntry {
+ //size
+ int size = -1;
+ Tag tag;
+ Object metadata;
+ EntryImpl parent, child;
+ long numericVal;
+ double doubleVal;
+ Object objVal;
+ public Object ctx;
+ boolean boolVal;
+ boolean mapEntry;
+ long idx;
+
+ EntryListener entryListener;
+
+ boolean consumedFully = false;
+ int depth = 0;
+ CharSequence name;
+
+
+ EntryImpl getChildAndReset() {
+ if (child == null) {
+ child = new EntryImpl();
+ child.parent = this;
+ child.depth = depth + 1;
+ }
+ child.reset();
+ return child;
+
+ }
+
+ @Override
+ public long index() {
+ return idx;
+ }
+
+ @Override
+ public int length() {
+ return size;
+ }
+
+ public Tag getTag() {
+ return tag;
+ }
+
+ @Override
+ public boolean boolVal() {
+ return boolVal;
+ }
+
+ @Override
+ public boolean isKeyValEntry() {
+ return mapEntry;
+ }
+
+ @Override
+ public CharSequence name() {
+ return name;
+ }
+
+ @Override
+ public int depth() {
+ return depth;
+ }
+
+ @Override
+ public DataEntry parent() {
+ return parent;
+ }
+
+ @Override
+ public Object metadata() {
+ return metadata;
+ }
+
+ @Override
+ public Object ctx() {
+ return parent == null ? null : parent.ctx;
+ }
+
+ @Override
+ public Type type() {
+ return tag.type;
+ }
+
+ @Override
+ public int intVal() {
+ return (int) numericVal;
+ }
+
+ @Override
+ public long longVal() {
+ return numericVal;
+ }
+
+ @Override
+ public float floatVal() {
+ if(tag.type == Type.FLOAT) return (float) doubleVal;
+ else {
+ return ((Number) val()).floatValue();
+ }
+ }
+
+ @Override
+ public double doubleVal() {
+ return doubleVal;
+ }
+
+ @Override
+ public Object val() {
+ if (objVal != null) return objVal;
+ try {
+ return objVal = tag.readObject(codec, this);
+ } catch (IOException e) {
+ throw new RuntimeException("Error with stream", e);
+ } finally {
+ consumedFully = true;
+ }
+ }
+
+ @Override
+ public void listenContainer(Object ctx, EntryListener listener) {
+ this.entryListener = listener;
+ this.ctx = ctx;
+ }
+
+ void reset() {
+ this.doubleVal = 0.0d;
+ this.numericVal = 0l;
+ this.objVal = null;
+ this.ctx = null;
+ this.entryListener = null;
+ this.size = -1;
+ this.tag = null;
+ consumedFully = false;
+ metadata = null;
+ name = null;
+ idx = -1;
+ }
+
+ public void callEnd() {
+ if (entryListener != null) entryListener.end(this);
+
+ }
+ }
+
+ static final boolean LOWER_5_BITS = true;
+ static final boolean UPPER_3_BITS = false;
+
+ public enum Tag {
+ _NULL(NULL, LOWER_5_BITS, DataEntry.Type.NULL),
+ _BOOL_TRUE(BOOL_TRUE, LOWER_5_BITS, DataEntry.Type.BOOL) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) {
+ entry.boolVal = true;
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Boolean.TRUE;
+ }
+ },
+ _BOOL_FALSE(BOOL_FALSE, LOWER_5_BITS, DataEntry.Type.BOOL) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) {
+ entry.boolVal = false;
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Boolean.FALSE;
+ }
+ },
+ _BYTE(BYTE, LOWER_5_BITS, DataEntry.Type.INT) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.numericVal = streamCodec.dis.readByte();
+ entry.consumedFully = true;
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Byte.valueOf((byte) entry.numericVal);
+ }
+ },
+ _SHORT(SHORT, LOWER_5_BITS, DataEntry.Type.INT) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.numericVal = streamCodec.dis.readShort();
+ entry.consumedFully = true;
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Short.valueOf((short) entry.numericVal);
+ }
+ },
+ _DOUBLE(DOUBLE, LOWER_5_BITS, DataEntry.Type.DOUBLE) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.doubleVal = streamCodec.dis.readDouble();
+ entry.consumedFully = true;
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Double.valueOf(entry.doubleVal);
+ }
+ },
+ _INT(INT, LOWER_5_BITS, DataEntry.Type.INT) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.numericVal = streamCodec.dis.readInt();
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Integer.valueOf((int) entry.numericVal);
+ }
+
+ },//signed integer
+ _LONG(LONG, LOWER_5_BITS, DataEntry.Type.LONG) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.numericVal = streamCodec.dis.readLong();
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Long.valueOf(entry.numericVal);
+ }
+ },
+ _FLOAT(FLOAT, LOWER_5_BITS, DataEntry.Type.FLOAT) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.doubleVal = streamCodec.dis.readFloat();
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Float.valueOf((float) entry.doubleVal);
+ }
+ },
+ _DATE(DATE, LOWER_5_BITS, DataEntry.Type.DATE) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec streamCodec) throws IOException {
+ entry.numericVal = streamCodec.dis.readLong();
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return new Date(entry.numericVal);
+ }
+ },
+ _MAP(MAP, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readObjSz(codec, entry.tag);
+ }
+
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ try {
+ for (int i = 0; i < entry.size; i++) {
+ CharSequence key = codec.readObjKey(codec.getTag());
+ callbackMapEntryListener(entry, key, codec, i);
+ }
+ } finally {
+ entry.callEnd();
+ }
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readMap(codec.dis,entry.size);
+ }
+ },
+ _SOLRDOC(SOLRDOC, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ try {
+ codec.getTag();
+ entry.size = codec.readSize(codec.dis);// readObjSz(codec, entry.tag);
+ for (int i = 0; i < entry.size; i++) {
+ Tag tag = codec.getTag();
+ if (tag == _SOLRDOC) {
+ EntryImpl e = entry.getChildAndReset();
+ e.tag = tag;
+ e.idx = i;
+ Tag.callbackIterListener(entry, e, codec);
+ } else {
+ CharSequence key = codec.readObjKey(tag);
+ callbackMapEntryListener(entry, key, codec, i);
+ }
+
+ }
+ } finally {
+ entry.callEnd();
+
+ }
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readSolrDocument(codec.dis);
+ }
+ },
+ _SOLRDOCLST(SOLRDOCLST, LOWER_5_BITS, DataEntry.Type.ENTRY_ITER) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.metadata = codec.readVal(codec.dis);
+ codec.getTag();//ignore this
+ entry.size = codec.readSize(codec.dis);
+ }
+
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ try {
+ for (int i = 0; i < entry.size; i++) {
+ EntryImpl newEntry = codec.beginRead(entry);
+ newEntry.idx = i;
+ Tag.callbackIterListener(entry, newEntry, codec);
+ }
+ } finally {
+ entry.callEnd();
+ }
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ SolrDocumentList solrDocs = new SolrDocumentList();
+ if(entry.metadata != null){
+ List list = (List) entry.metadata;
+ solrDocs.setNumFound((Long) list.get(0));
+ solrDocs.setStart((Long) list.get(1));
+ solrDocs.setMaxScore((Float) list.get(2));
+ }
+ List<SolrDocument> l = codec.readArray(codec.dis, entry.size);
+ solrDocs.addAll(l);
+ return solrDocs;
+ }
+ },
+ _BYTEARR(BYTEARR, LOWER_5_BITS, DataEntry.Type.BYTEARR) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readVInt(codec.dis);
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ ByteBuffer buf = codec.readByteBuffer(codec.dis, entry.size);
+ entry.size = buf.limit() - buf.position();
+ return buf;
+ }
+
+ @Override
+ public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+ codec.skip(entry.size);
+ }
+ },
+ _ITERATOR(ITERATOR, LOWER_5_BITS, DataEntry.Type.ENTRY_ITER) {
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ try {
+ long idx = 0;
+ while (true) {
+ EntryImpl newEntry = codec.beginRead(entry);
+ newEntry.idx = idx++;
+ if (newEntry.tag == _END) break;
+ newEntry.idx = idx++;
+ Tag.callbackIterListener(entry, newEntry, codec);
+ }
+ } finally {
+ entry.callEnd();
+ }
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readIterator(codec.dis);
+ }
+ },
+
+ _END(END, LOWER_5_BITS, null),
+
+ _SOLRINPUTDOC(SOLRINPUTDOC, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.objVal = readObject(codec, entry);
+ entry.consumedFully = true;
+ }
+ },
+ _MAP_ENTRY_ITER(MAP_ENTRY_ITER, LOWER_5_BITS, DataEntry.Type.KEYVAL_ITER) {
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ long idx = 0;
+ for (; ; ) {
+ Tag tag = codec.getTag();
+ if (tag == Tag._END) break;
+ CharSequence key = codec.readObjKey(tag);
+ callbackMapEntryListener(entry, key, codec, idx++);
+ }
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readMapIter(codec.dis);
+ }
+ },
+ _ENUM_FIELD_VALUE(ENUM_FIELD_VALUE, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.objVal =codec.readEnumFieldValue(codec.dis);
+ entry.consumedFully = true;
+ }
+ },
+ _MAP_ENTRY(MAP_ENTRY, LOWER_5_BITS, DataEntry.Type.JAVA_OBJ) {
+ //doesn't support streaming
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.objVal = codec.readMapEntry(codec.dis);
+ entry.consumedFully = true;
+ }
+ },
+ // types that combine tag + length (or other info) in a single byte
+ _TAG_AND_LEN(TAG_AND_LEN, UPPER_3_BITS, null),
+ _STR(STR, UPPER_3_BITS, DataEntry.Type.STR) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readObjSz(codec, this);
+
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readUtf8(codec.dis);
+ }
+
+ @Override
+ public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+ codec.skip(entry.size);
+ }
+ },
+ _SINT(SINT, UPPER_3_BITS, DataEntry.Type.INT) {//unsigned integer
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.numericVal = codec.readSmallInt(codec.dis);
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Integer.valueOf((int) entry.numericVal);
+ }
+
+ },
+ _SLONG(SLONG, UPPER_3_BITS, DataEntry.Type.LONG) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.numericVal = codec.readSmallLong(codec.dis);
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) {
+ return Long.valueOf((int) entry.numericVal);
+ }
+
+
+ },
+ _ARR(ARR, UPPER_3_BITS, DataEntry.Type.ENTRY_ITER) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readObjSz(codec, this);
+ }
+
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ for (int i = 0; i < entry.size; i++) {
+ EntryImpl newEntry = codec.beginRead(entry);
+ newEntry.idx = i;
+ Tag.callbackIterListener(entry, newEntry, codec);
+ }
+ }
+
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readArray(codec.dis);
+ }
+ }, //
+ _ORDERED_MAP(ORDERED_MAP, UPPER_3_BITS, DataEntry.Type.KEYVAL_ITER) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readObjSz(codec, entry.tag);
+ }
+
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ _MAP.stream(entry, codec);
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readOrderedMap(codec.dis);
+ }
+
+ }, // SimpleOrderedMap (a NamedList subclass, and more common)
+ _NAMED_LST(NAMED_LST, UPPER_3_BITS, DataEntry.Type.KEYVAL_ITER) {
+ @Override
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+ entry.size = readObjSz(codec, entry.tag);
+ }
+
+ @Override
+ public void stream(EntryImpl entry, StreamCodec codec) throws IOException {
+ _MAP.stream(entry, codec);
+ }
+
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readNamedList(codec.dis);
+ }
+ }, // NamedList
+
+ _EXTERN_STRING(EXTERN_STRING, UPPER_3_BITS, DataEntry.Type.STR) {
+ @Override
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ return codec.readExternString(codec.dis);
+ }
+ };
+
+ private static int readObjSz(StreamCodec codec, Tag tag) throws IOException {
+ return tag.isLower5Bits ?
+ StreamCodec.readVInt(codec.dis) :
+ codec.readSize(codec.dis);
+ }
+
+ private static void callbackMapEntryListener(EntryImpl entry, CharSequence key, StreamCodec codec, long idx)
+ throws IOException {
+ EntryImpl newEntry = codec.beginRead(entry);
+ newEntry.name = key;
+ newEntry.mapEntry = true;
+ newEntry.idx = idx;
+ try {
+ if (entry.entryListener != null) entry.entryListener.entry(newEntry);
+ } finally {
+ // the listener did not consume the entry
+ postCallback(codec, newEntry);
+ }
+ }
+
+ private static void callbackIterListener(EntryImpl parent, EntryImpl newEntry, StreamCodec codec)
+ throws IOException {
+ try {
+ newEntry.mapEntry = false;
+ if(parent.entryListener != null) parent.entryListener.entry(newEntry);
+ } finally {
+ // the listener did not consume the entry
+ postCallback(codec, newEntry);
+ }
+ }
+
+ private static void postCallback(StreamCodec codec, EntryImpl newEntry) throws IOException {
+ if (!newEntry.consumedFully) {
+ if (newEntry.tag.type.isContainer) {
+ //this is a map like container object and there is a listener
+ if (newEntry.entryListener == null) newEntry.entryListener = emptylistener;
+ newEntry.tag.stream(newEntry, codec);
+ } else {
+ newEntry.tag.skip(newEntry, codec);
+ }
+ }
+ }
+
+
+ final int code;
+ final boolean isLower5Bits;
+ final DataEntry.Type type;
+
+ Tag(int code, boolean isLower5Bits, DataEntry.Type type) {
+ this.code = code;
+ this.isLower5Bits = isLower5Bits;
+ this.type = type;
+ }
+
+ /**
+ * This applies to only container Objects. This is invoked only if there is a corresponding listener.
+ *
+ */
+ public void stream(EntryImpl currentEntry, StreamCodec codec) throws IOException {
+
+
+ }
+
+ /**
+ * This should read the minimal data about the entry . if the data is a primitive type ,
+ * read the whole thing
+ */
+ public void lazyRead(EntryImpl entry, StreamCodec codec) throws IOException {
+
+ }
+
+ /**
+ * Read the entry as an Object. The behavior should be similar to that of {@link JavaBinCodec#readObject(DataInputInputStream)}
+ */
+ public Object readObject(StreamCodec codec, EntryImpl entry) throws IOException {
+ throw new RuntimeException("Unsupported object : " + this.name());
+ }
+
+ /**
+ * Read the entry from and discard the data. Do not create any objects
+ */
+ public void skip(EntryImpl entry, StreamCodec codec) throws IOException {
+ if (entry.tag.type == DataEntry.Type.KEYVAL_ITER || entry.tag.type == DataEntry.Type.ENTRY_ITER) {
+ entry.entryListener = null;
+ stream(entry, codec);
+ } else if (!entry.tag.type.isPrimitive) {
+ readObject(codec, entry);
+ }
+
+ }
+ }
+
+ static final private Tag[] lower5BitTags = new Tag[32];
+
+ static {
+ for (Tag tag : Tag.values()) {
+ if (tag.isLower5Bits) {
+ lower5BitTags[tag.code] = tag;
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ for (int i = 0; i < lower5BitTags.length; i++) {
+ Tag tag = lower5BitTags[i];
+ if (tag == null) continue;
+ System.out.println(tag.name() + " : " + tag.code + (tag.isLower5Bits ? " lower" : " upper"));
+ }
+ }
+
+
+ private static void addObj(DataEntry e) {
+ if (e.type().isContainer) {
+ Object ctx = e.type() == DataEntry.Type.KEYVAL_ITER ?
+ new LinkedHashMap(getSize(e)) :
+ new ArrayList(getSize(e));
+ if (e.ctx() != null) {
+ if (e.isKeyValEntry()) {
+ ((Map) e.ctx()).put(e.name(), ctx);
+ } else {
+ ((Collection) e.ctx()).add(ctx);
+ }
+ }
+ e.listenContainer(ctx, getEntryListener());
+ } else {
+ Object val = e.val();
+ if (val instanceof Utf8CharSequence) val = ((Utf8CharSequence) val).clone();
+ if (e.ctx() != null) {
+ if (e.isKeyValEntry()) {
+ ((Map) e.ctx()).put(e.name(), val);
+ } else {
+ ((Collection) e.ctx()).add(val);
+ }
+ }
+ }
+ }
+
+ private static int getSize(DataEntry e) {
+ int sz = e.length();
+ if (sz == -1) sz = e.type() == DataEntry.Type.KEYVAL_ITER ? 16 : 10;
+ return sz;
+ }
+
+
+ public static EntryListener getEntryListener() {
+ return ENTRY_LISTENER;
+ }
+
+
+ static final EntryListener ENTRY_LISTENER = FastJavaBinDecoder::addObj;
+
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
index 782d109..9535ee1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
@@ -202,7 +202,7 @@ public class JavaBinCodec implements PushWriter {
return _init(dis);
}
- private FastInputStream _init(FastInputStream dis) throws IOException {
+ protected FastInputStream _init(FastInputStream dis) throws IOException {
version = dis.readByte();
if (version != VERSION) {
throw new RuntimeException("Invalid version (expected " + VERSION +
@@ -683,6 +683,10 @@ public class JavaBinCodec implements PushWriter {
public Map<Object,Object> readMap(DataInputInputStream dis)
throws IOException {
int sz = readVInt(dis);
+ return readMap(dis, sz);
+ }
+
+ protected Map<Object, Object> readMap(DataInputInputStream dis, int sz) throws IOException {
Map<Object, Object> m = newMap(sz);
for (int i = 0; i < sz; i++) {
Object key = readVal(dis);
@@ -780,6 +784,10 @@ public class JavaBinCodec implements PushWriter {
public List<Object> readArray(DataInputInputStream dis) throws IOException {
int sz = readSize(dis);
+ return readArray(dis, sz);
+ }
+
+ protected List readArray(DataInputInputStream dis, int sz) throws IOException {
ArrayList<Object> l = new ArrayList<>(sz);
for (int i = 0; i < sz; i++) {
l.add(readVal(dis));
@@ -796,8 +804,8 @@ public class JavaBinCodec implements PushWriter {
writeInt(enumFieldValue.toInt());
writeStr(enumFieldValue.toString());
}
-
- public void writeMapEntry(Entry<Object,Object> val) throws IOException {
+
+ public void writeMapEntry(Map.Entry val) throws IOException {
writeTag(MAP_ENTRY);
writeVal(val.getKey());
writeVal(val.getValue());
@@ -926,12 +934,27 @@ public class JavaBinCodec implements PushWriter {
protected CharSequence readUtf8(DataInputInputStream dis) throws IOException {
int sz = readSize(dis);
+ return readUtf8(dis, sz);
+ }
+
+ protected CharSequence readUtf8(DataInputInputStream dis, int sz) throws IOException {
+ ByteArrayUtf8CharSequence result = new ByteArrayUtf8CharSequence(null,0,0);
+ if(dis.readDirectUtf8(result, sz)){
+ result.stringProvider= getStringProvider();
+ return result;
+ }
+
if (sz > MAX_UTF8_SZ) return _readStr(dis, null, sz);
if (bytesBlock == null) bytesBlock = new BytesBlock(1024 * 4);
BytesBlock block = this.bytesBlock.expand(sz);
dis.readFully(block.getBuf(), block.getStartPos(), sz);
- ByteArrayUtf8CharSequence result = new ByteArrayUtf8CharSequence(block.getBuf(), block.getStartPos(), sz);
+ result.reset(block.getBuf(), block.getStartPos(), sz,null);
+ result.stringProvider = getStringProvider();
+ return result;
+ }
+
+ private Function<ByteArrayUtf8CharSequence, String> getStringProvider() {
if (stringProvider == null) {
stringProvider = butf8cs -> {
synchronized (JavaBinCodec.this) {
@@ -941,8 +964,7 @@ public class JavaBinCodec implements PushWriter {
}
};
}
- result.stringProvider = this.stringProvider;
- return result;
+ return this.stringProvider;
}
public void writeInt(int val) throws IOException {
@@ -1231,66 +1253,6 @@ public class JavaBinCodec implements PushWriter {
}
}
- public static class StringBytes {
- byte[] bytes;
-
- /**
- * Offset of first valid byte.
- */
- int offset;
-
- /**
- * Length of used bytes.
- */
- private int length;
- private int hash;
-
- public StringBytes(byte[] bytes, int offset, int length) {
- reset(bytes, offset, length);
- }
-
- StringBytes reset(byte[] bytes, int offset, int length) {
- this.bytes = bytes;
- this.offset = offset;
- this.length = length;
- hash = bytes == null ? 0 : Hash.murmurhash3_x86_32(bytes, offset, length, 0);
- return this;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
- if (other instanceof StringBytes) {
- return this.bytesEquals((StringBytes) other);
- }
- return false;
- }
-
- boolean bytesEquals(StringBytes other) {
- assert other != null;
- if (length == other.length) {
- int otherUpto = other.offset;
- final byte[] otherBytes = other.bytes;
- final int end = offset + length;
- for (int upto = offset; upto < end; upto++, otherUpto++) {
- if (bytes[upto] != otherBytes[otherUpto]) {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
- }
-
@Override
public void close() throws IOException {
if (daos != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java b/solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java
new file mode 100644
index 0000000..2c95916
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/StringBytes.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+public class StringBytes {
+ byte[] bytes;
+
+ /**
+ * Offset of first valid byte.
+ */
+ int offset;
+
+ /**
+ * Length of used bytes.
+ */
+ int length;
+ private int hash;
+
+ public StringBytes(byte[] bytes, int offset, int length) {
+ reset(bytes, offset, length);
+ }
+
+ StringBytes reset(byte[] bytes, int offset, int length) {
+ this.bytes = bytes;
+ this.offset = offset;
+ this.length = length;
+ hash = bytes == null ? 0 : Hash.murmurhash3_x86_32(bytes, offset, length, 0);
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other instanceof StringBytes) {
+ return this.bytesEquals((StringBytes) other);
+ }
+ return false;
+ }
+
+ boolean bytesEquals(StringBytes other) {
+ assert other != null;
+ if (length == other.length) {
+ int otherUpto = other.offset;
+ final byte[] otherBytes = other.bytes;
+ final int end = offset + length;
+ for (int upto = offset; upto < end; upto++, otherUpto++) {
+ if (bytes[upto] != otherBytes[otherUpto]) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utf8CharSequence.java b/solr/solrj/src/java/org/apache/solr/common/util/Utf8CharSequence.java
index 1533e5c..9f15afa 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utf8CharSequence.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utf8CharSequence.java
@@ -23,7 +23,7 @@ import java.io.OutputStream;
/**
* A byte[] backed String
*/
-public interface Utf8CharSequence extends CharSequence , Comparable {
+public interface Utf8CharSequence extends CharSequence , Comparable, Cloneable {
/**
* Write the bytes into a buffer. The objective is to avoid the local bytes being exposed to
@@ -71,4 +71,6 @@ public interface Utf8CharSequence extends CharSequence , Comparable {
}
}
+ Utf8CharSequence clone();
+
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index e36091c..d079052 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -84,6 +84,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class Utils {
public static final Function NEW_HASHMAP_FUN = o -> new HashMap<>();
+ public static final Function NEW_LINKED_HASHMAP_FUN = o -> new LinkedHashMap<>();
public static final Function NEW_ATOMICLONG_FUN = o -> new AtomicLong();
public static final Function NEW_ARRAYLIST_FUN = o -> new ArrayList<>();
public static final Function NEW_HASHSET_FUN = o -> new HashSet<>();
diff --git a/solr/solrj/src/test-files/solrj/javabin_sample.bin b/solr/solrj/src/test-files/solrj/javabin_sample.bin
new file mode 100644
index 0000000..23cf4bc
Binary files /dev/null and b/solr/solrj/src/test-files/solrj/javabin_sample.bin differ
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java b/solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java
new file mode 100644
index 0000000..79f1b28
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/util/TestFastJavabinDecoder.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.FastStreamingDocsCallback;
+import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
+import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.FastJavaBinDecoder.Tag;
+
+import static org.apache.solr.common.util.Utils.NEW_ARRAYLIST_FUN;
+import static org.apache.solr.common.util.Utils.NEW_LINKED_HASHMAP_FUN;
+
+public class TestFastJavabinDecoder extends SolrTestCaseJ4 {
+
+
+ public void testTagRead() throws Exception {
+ BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+ FastOutputStream faos = FastOutputStream.wrap(baos);
+
+ JavaBinCodec codec = new JavaBinCodec(faos, null);
+ codec.writeVal(10);
+ codec.writeVal(100);
+ codec.writeVal("Hello!");
+
+ faos.flushBuffer();
+ faos.close();
+
+
+ FastInputStream fis = new FastInputStream(null, baos.getbuf(), 0, baos.size());
+ FastJavaBinDecoder.StreamCodec scodec = new FastJavaBinDecoder.StreamCodec(fis);
+ scodec.start();
+ Tag tag = scodec.getTag();
+ assertEquals(Tag._SINT, tag);
+ assertEquals(10, scodec.readSmallInt(scodec.dis));
+ tag = scodec.getTag();
+ assertEquals(Tag._SINT, tag);
+ assertEquals(100, scodec.readSmallInt(scodec.dis));
+ tag = scodec.getTag();
+ assertEquals(Tag._STR, tag);
+ assertEquals("Hello!", scodec.readStr(fis));
+ }
+
+ public void testSimple() throws IOException {
+ String sampleObj = "{k : v , " +
+ "mapk : {k1: v1, k2 : [v2_1 , v2_2 ]}," +
+ "listk : [ 1, 2, 3 ]," +
+ "maps : [ {id: kov1}, {id : kov2} ,{id:kov3 , longv : 234} ]," +
+ "}";
+
+
+ Map m = (Map) Utils.fromJSONString(sampleObj);
+ BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+ new JavaBinCodec().marshal(m, baos);
+
+ Map m2 = (Map) new JavaBinCodec().unmarshal(new FastInputStream(null, baos.getbuf(), 0, baos.size()));
+
+ LinkedHashMap fastMap = (LinkedHashMap) new FastJavaBinDecoder()
+ .withInputStream(new FastInputStream(null, baos.getbuf(), 0, baos.size()))
+ .decode(FastJavaBinDecoder.getEntryListener());
+ assertEquals(Utils.writeJson(m2, new StringWriter(), true).toString(),
+ Utils.writeJson(fastMap, new StringWriter(), true).toString());
+
+ Object newMap = new FastJavaBinDecoder()
+ .withInputStream(new FastInputStream(null, baos.getbuf(), 0, baos.size()))
+ .decode(e -> {
+ e.listenContainer(new LinkedHashMap<>(), e_ -> {
+ Map rootMap = (Map) e_.ctx();
+ if (e_.type() == DataEntry.Type.ENTRY_ITER) {
+ e_.listenContainer(rootMap.computeIfAbsent(e_.name(), NEW_ARRAYLIST_FUN),
+ FastJavaBinDecoder.getEntryListener());
+ } else if (e_.type() == DataEntry.Type.KEYVAL_ITER) {
+ e_.listenContainer(rootMap.computeIfAbsent(e_.name(), NEW_LINKED_HASHMAP_FUN), e1 -> {
+ Map m1 = (Map) e1.ctx();
+ if ("k1".equals(e1.name())) {
+ m1.put(e1.name(), e1.val().toString());
+ }
+ //eat up k2
+ });
+ } else if (e_.type() == DataEntry.Type.STR) {
+ rootMap.put(e_.name(), e_.val().toString());
+ }
+
+ });
+ });
+ ((Map) m2.get("mapk")).remove("k2");
+ assertEquals(Utils.writeJson(m2, new StringWriter(), true).toString(),
+ Utils.writeJson(newMap, new StringWriter(), true).toString());
+
+ }
+
+ public void testFastJavabinStreamingDecoder() throws IOException {
+ BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+ try (InputStream is = getClass().getResourceAsStream("/solrj/javabin_sample.bin")) {
+ IOUtils.copy(is, baos);
+ }
+ SimpleOrderedMap o = (SimpleOrderedMap) new JavaBinCodec().unmarshal(baos.toByteArray());
+ SolrDocumentList list = (SolrDocumentList) o.get("response");
+ System.out.println(" " + list.getNumFound() + " , " + list.getStart() + " , " + list.getMaxScore());
+ class Pojo {
+ long _idx;
+ CharSequence id;
+ boolean inStock;
+ float price;
+ List<NamedList> children;
+ }
+ StreamingBinaryResponseParser parser = new StreamingBinaryResponseParser(new FastStreamingDocsCallback() {
+
+ @Override
+ public Object initDocList(Long numFound, Long start, Float maxScore) {
+ assertEquals((Long) list.getNumFound(), numFound);
+ assertEquals((Long) list.getStart(), start);
+ assertEquals(list.getMaxScore(), maxScore);
+ return new int[1];
+ }
+
+ @Override
+ public Object startDoc(Object docListObj) {
+ Pojo pojo = new Pojo();
+ pojo._idx = ((int[]) docListObj)[0]++;
+ return pojo;
+ }
+
+ @Override
+ public void field(DataEntry field, Object docObj) {
+ Pojo pojo = (Pojo) docObj;
+ if ("id".equals(field.name())) {
+ pojo.id = ((Utf8CharSequence) field.val()).clone();
+ } else if (field.type() == DataEntry.Type.BOOL && "inStock".equals(field.name())) {
+ pojo.inStock = field.boolVal();
+ } else if (field.type() == DataEntry.Type.FLOAT && "price".equals(field.name())) {
+ pojo.price = field.floatVal();
+ }
+
+ }
+
+ @Override
+ public void endDoc(Object docObj) {
+ Pojo pojo = (Pojo) docObj;
+ SolrDocument doc = list.get((int) pojo._idx);
+ assertEquals(doc.get("id"), pojo.id.toString());
+ if (doc.get("inStock") != null)
+ assertEquals(doc.get("inStock"), pojo.inStock);
+ if (doc.get("price") != null)
+ assertEquals((Float) doc.get("price"), pojo.price, 0.001);
+ }
+ });
+
+ parser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+
+
+ }
+
+ public void testParsingWithChildDocs() throws IOException {
+ SolrDocument d1 = TestJavaBinCodec.generateSolrDocumentWithChildDocs();
+ d1.setField("id", "101");
+ SolrDocument d2 = TestJavaBinCodec.generateSolrDocumentWithChildDocs();
+ d2.setField("id", "102");
+ d2.setField("longs", Arrays.asList(100l, 200l));
+
+ SolrDocumentList sdocs = new SolrDocumentList();
+ sdocs.setStart(0);
+ sdocs.setNumFound(2);
+ sdocs.add(d1);
+ sdocs.add(d2);
+
+ SimpleOrderedMap orderedMap = new SimpleOrderedMap();
+ orderedMap.add("response", sdocs);
+
+ BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+ new JavaBinCodec().marshal(orderedMap, baos);
+ boolean[] useListener = new boolean[1];
+ useListener[0] = true;
+
+ class Pojo {
+ CharSequence id;
+ CharSequence subject;
+ CharSequence cat;
+ long[] longs;
+ final List<Pojo> children = new ArrayList<>();
+
+ public void compare(SolrDocument d) {
+ assertEquals(id, d.getFieldValue("id"));
+ assertEquals(subject, d.getFieldValue("subject"));
+ assertEquals(cat, d.getFieldValue("cat"));
+ assertEquals(d.getChildDocumentCount(), children.size());
+ List<Long> l = (List<Long>) d.getFieldValue("longs");
+ if(l != null){
+ assertNotNull(longs);
+ for (int i = 0; i < l.size(); i++) {
+ Long v = l.get(i);
+ assertEquals(v.longValue(), longs[i]);
+ }
+ }
+ List<SolrDocument> childDocuments = d.getChildDocuments();
+ if (childDocuments == null) return;
+ for (int i = 0; i < childDocuments.size(); i++) {
+ children.get(i).compare(childDocuments.get(i));
+ }
+
+ }
+
+ }
+ List<Pojo> l = new ArrayList<>();
+ StreamingBinaryResponseParser binaryResponseParser = new StreamingBinaryResponseParser(new FastStreamingDocsCallback() {
+
+ @Override
+ public Object initDocList(Long numFound, Long start, Float maxScore) {
+ return l;
+ }
+
+ @Override
+ public Object startDoc(Object docListObj) {
+ Pojo pojo = new Pojo();
+ ((List) docListObj).add(pojo);
+ return pojo;
+ }
+
+ @Override
+ public void field(DataEntry field, Object docObj) {
+ Pojo pojo = (Pojo) docObj;
+ if (field.name().equals("id")) {
+ pojo.id = field.strValue();
+ } else if (field.name().equals("subject")) {
+ pojo.subject = field.strValue();
+ } else if (field.name().equals("cat")) {
+ pojo.cat = field.strValue();
+ } else if (field.type() == DataEntry.Type.ENTRY_ITER && "longs".equals(field.name())) {
+ if(useListener[0]){
+ field.listenContainer(pojo.longs = new long[field.length()], READLONGS);
+ } else {
+ List<Long> longList = (List<Long>) field.val();
+ pojo.longs = new long[longList.size()];
+ for (int i = 0; i < longList.size(); i++) {
+ pojo.longs[i] = longList.get(i);
+
+ }
+
+ }
+ }
+
+ }
+
+
+ @Override
+ public Object startChildDoc(Object parentDocObj) {
+ Pojo parent = (Pojo) parentDocObj;
+ Pojo child = new Pojo();
+ parent.children.add(child);
+ return child;
+ }
+ });
+ binaryResponseParser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+ for (int i = 0; i < sdocs.size(); i++) {
+ l.get(i).compare(sdocs.get(i));
+ }
+
+ l.clear();
+
+ useListener[0] = false;
+ binaryResponseParser.processResponse(new FastInputStream(null, baos.getbuf(), 0, baos.size()), null);
+ for (int i = 0; i < sdocs.size(); i++) {
+ l.get(i).compare(sdocs.get(i));
+ }
+
+
+ }
+
+ static final DataEntry.EntryListener READLONGS = e -> {
+ if (e.type() != DataEntry.Type.LONG) return;
+ long[] array = (long[]) e.ctx();
+ array[(int) e.index()] = e.longVal();
+
+ };
+}
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/TestJavaBinCodec.java b/solr/solrj/src/test/org/apache/solr/common/util/TestJavaBinCodec.java
index e60d8c7..6717375 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/TestJavaBinCodec.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/TestJavaBinCodec.java
@@ -65,7 +65,7 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
}
}
- private SolrDocument generateSolrDocumentWithChildDocs() {
+ public static SolrDocument generateSolrDocumentWithChildDocs() {
SolrDocument parentDocument = new SolrDocument();
parentDocument.addField("id", "1");
parentDocument.addField("subject", "parentDocument");
@@ -376,8 +376,8 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
}
private void testPerf() throws InterruptedException {
- final ArrayList<JavaBinCodec.StringBytes> l = new ArrayList<>();
- Cache<JavaBinCodec.StringBytes, String> cache = null;
+ final ArrayList<StringBytes> l = new ArrayList<>();
+ Cache<StringBytes, String> cache = null;
/* cache = new ConcurrentLRUCache<JavaBinCodec.StringBytes,String>(10000, 9000, 10000, 1000, false, true, null){
@Override
public String put(JavaBinCodec.StringBytes key, String val) {
@@ -388,12 +388,12 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
Runtime.getRuntime().gc();
printMem("before cache init");
- Cache<JavaBinCodec.StringBytes, String> cache1 = new MapBackedCache<>(new HashMap<>()) ;
+ Cache<StringBytes, String> cache1 = new MapBackedCache<>(new HashMap<>()) ;
final JavaBinCodec.StringCache STRING_CACHE = new JavaBinCodec.StringCache(cache1);
// STRING_CACHE = new JavaBinCodec.StringCache(cache);
byte[] bytes = new byte[0];
- JavaBinCodec.StringBytes stringBytes = new JavaBinCodec.StringBytes(null,0,0);
+ StringBytes stringBytes = new StringBytes(null,0,0);
for(int i=0;i<10000;i++) {
String s = String.valueOf(random().nextLong());
@@ -410,9 +410,9 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
int THREADS = 10;
runInThreads(THREADS, () -> {
- JavaBinCodec.StringBytes stringBytes1 = new JavaBinCodec.StringBytes(new byte[0], 0, 0);
+ StringBytes stringBytes1 = new StringBytes(new byte[0], 0, 0);
for (int i = 0; i < ITERS; i++) {
- JavaBinCodec.StringBytes b = l.get(i % l.size());
+ StringBytes b = l.get(i % l.size());
stringBytes1.reset(b.bytes, 0, b.bytes.length);
if (STRING_CACHE.get(stringBytes1) == null) throw new RuntimeException("error");
}
@@ -429,7 +429,7 @@ public class TestJavaBinCodec extends SolrTestCaseJ4 {
String a = null;
CharArr arr = new CharArr();
for (int i = 0; i < ITERS; i++) {
- JavaBinCodec.StringBytes sb = l.get(i % l.size());
+ StringBytes sb = l.get(i % l.size());
arr.reset();
ByteUtils.UTF8toUTF16(sb.bytes, 0, sb.bytes.length, arr);
a = arr.toString();
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/Utf8CharSequenceTest.java b/solr/solrj/src/test/org/apache/solr/common/util/Utf8CharSequenceTest.java
index bf3fd26..0483293 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/Utf8CharSequenceTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/Utf8CharSequenceTest.java
@@ -83,7 +83,7 @@ public class Utf8CharSequenceTest extends SolrTestCaseJ4 {
NamedList nl1 = (NamedList) new JavaBinCodec()
.setReadStringAsCharSeq(true)
- .unmarshal(new FastInputStream(null, bytes, 0, bytes.length));
+ .unmarshal(new ByteArrayInputStream( bytes, 0, bytes.length));
byte[] buf = ((ByteArrayUtf8CharSequence) nl1.getVal(0)).getBuf();
ByteArrayUtf8CharSequence valLong = (ByteArrayUtf8CharSequence) nl1.get("key_long");
assertFalse(valLong.getBuf() == buf);