You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/11/24 03:37:47 UTC

lucene-solr:branch_6x: SOLR-9721: javabin Tuple parser for streaming and other end points

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 8a5bb4666 -> 28fd42bf4


SOLR-9721: javabin Tuple parser for streaming and other end points


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/28fd42bf
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/28fd42bf
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/28fd42bf

Branch: refs/heads/branch_6x
Commit: 28fd42bf4ab47c130d814e1abd3e318f29ea66d7
Parents: 8a5bb46
Author: Noble Paul <no...@apache.org>
Authored: Thu Nov 24 09:00:21 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Thu Nov 24 09:02:28 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/handler/ExportWriter.java   |  13 +-
 .../apache/solr/response/SmileWriterTest.java   |  17 +-
 .../response/TestJavabinTupleStreamParser.java  | 195 +++++++++++++++++++
 .../io/stream/JavabinTupleStreamParser.java     | 189 ++++++++++++++++++
 .../solr/client/solrj/io/stream/SolrStream.java |   9 +-
 .../org/apache/solr/common/SolrDocument.java    |   4 +
 .../apache/solr/common/util/JavaBinCodec.java   |  12 +-
 .../client/solrj/io/stream/StreamingTest.java   |   2 +-
 9 files changed, 429 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 1b1a9ed..82e3e8e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -81,6 +81,8 @@ New Features
 * SOLR-9324: Support Secure Impersonation / Proxy User for solr authentication
   (Gregory Chanan, Hrishikesh Gadre via yonik)
 
+* SOLR-9721: javabin Tuple parser for streaming and other end points (noble)
+
 Optimizations
 ----------------------
 * SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
index da3e9c0..5acfa2f 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
@@ -52,10 +52,13 @@ import org.apache.solr.common.MapWriter.EntryWriter;
 import org.apache.solr.common.PushWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.response.BinaryResponseWriter;
 import org.apache.solr.response.JSONResponseWriter;
+import org.apache.solr.response.QueryResponseWriter;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.BoolField;
 import org.apache.solr.schema.FieldType;
@@ -126,8 +129,14 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   }
 
   public void write(OutputStream os) throws IOException {
-    respWriter = new OutputStreamWriter(os, StandardCharsets.UTF_8);
-    writer = JSONResponseWriter.getPushWriter(respWriter, req, res);
+    QueryResponseWriter rw = req.getCore().getResponseWriters().get(wt);
+    if (rw instanceof BinaryResponseWriter) {
+      //todo add support for other writers after testing
+      writer = new JavaBinCodec(os, null);
+    } else {
+      respWriter = new OutputStreamWriter(os, StandardCharsets.UTF_8);
+      writer = JSONResponseWriter.getPushWriter(respWriter, req, res);
+    }
     Exception exception = res.getException();
     if (exception != null) {
       if (!(exception instanceof IgnoreException)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java b/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java
index 77c263a..871f8c4 100644
--- a/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/response/SmileWriterTest.java
@@ -147,13 +147,8 @@ public class SmileWriterTest extends SolrTestCaseJ4 {
 
   @Test
   public void test10Docs() throws IOException {
-    SolrDocumentList l = new SolrDocumentList();
-    for(int i=0;i<10; i++){
-      l.add(sampleDoc(random(), i));
-    }
-
     SolrQueryResponse response = new SolrQueryResponse();
-    response.getValues().add("results", l);
+    SolrDocumentList l = constructSolrDocList(response);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     new SmileResponseWriter().write(baos, new LocalSolrQueryRequest(null, new ModifiableSolrParams()), response);
 
@@ -171,6 +166,16 @@ public class SmileWriterTest extends SolrTestCaseJ4 {
 
   }
 
+  public static SolrDocumentList constructSolrDocList(SolrQueryResponse response) {
+    SolrDocumentList l = new SolrDocumentList();
+    for(int i=0;i<10; i++){
+      l.add(sampleDoc(random(), i));
+    }
+
+    response.getValues().add("results", l);
+    return l;
+  }
+
   public static SolrDocument sampleDoc(Random r, int bufnum) {
     SolrDocument sdoc = new SolrDocument();
     sdoc.put("id", "my_id_" + bufnum);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java b/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java
new file mode 100644
index 0000000..d710446
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/response/TestJavabinTupleStreamParser.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.response;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.JavabinTupleStreamParser;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+
+import static org.apache.solr.response.SmileWriterTest.constructSolrDocList;
+
+public class TestJavabinTupleStreamParser extends SolrTestCaseJ4 {
+
+  public void testKnown() throws IOException {
+    String payload = "{\n" +
+        "  \"responseHeader\":{\n" +
+        "    \"zkConnected\":true,\n" +
+        "    \"status\":0,\n" +
+        "    \"QTime\":46},\n" +
+        "  \"response\":{\n" +
+        "    \"numFound\":2,\n" +
+        "    \"start\":0,\n" +
+        "    \"docs\":[\n" +
+        "      {\n" +
+        "        \"id\":\"2\",\n" +
+        "        \"a_s\":\"hello2\",\n" +
+        "        \"a_i\":2,\n" +
+        "        \"a_f\":0.0},\n" +
+        "      {\n" +
+        "        \"id\":\"3\",\n" +
+        "        \"a_s\":\"hello3\",\n" +
+        "        \"a_i\":3,\n" +
+        "        \"a_f\":3.0}]}}";
+    SimpleOrderedMap nl = convert2OrderedMap((Map) Utils.fromJSONString(payload));
+
+    byte[] bytes = serialize(nl);
+
+    JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), true);
+    Map<String, Object> map = parser.next();
+    assertEquals("2", map.get("id"));
+    map = parser.next();
+    assertEquals("3", map.get("id"));
+    System.out.println();
+    map = parser.next();
+    assertNull(map);
+
+  }
+
+  public SimpleOrderedMap convert2OrderedMap(Map m) {
+    SimpleOrderedMap result = new SimpleOrderedMap<>();
+    m.forEach((k, v) -> {
+      if (v instanceof List) v = ((List) v).iterator();
+      if (v instanceof Map) v = convert2OrderedMap((Map) v);
+      result.add((String) k, v);
+    });
+    return result;
+
+  }
+
+  public void testSimple() throws IOException {
+    List<Map<String, Object>> l = new ArrayList();
+    l.add(Utils.makeMap("id", 1, "f", 1.0f, "s", "Some str 1"));
+    l.add(Utils.makeMap("id", 2, "f", 2.0f, "s", "Some str 2"));
+    l.add(Utils.makeMap("id", 3, "f", 1.0f, "s", "Some str 3"));
+    l.add(Utils.makeMap("EOF", true, "RESPONSE_TIME", 206, "sleepMillis", 1000));
+    Iterator<Map<String, Object>> iterator = l.iterator();
+    TupleStream tupleStream = new TupleStream() {
+      @Override
+      public void setStreamContext(StreamContext context) {
+
+      }
+
+      @Override
+      public List<TupleStream> children() {
+        return null;
+      }
+
+      @Override
+      public void open() throws IOException {
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public Tuple read() throws IOException {
+        if (iterator.hasNext()) return new Tuple(iterator.next());
+        else return null;
+      }
+
+      @Override
+      public StreamComparator getStreamSort() {
+        return null;
+      }
+
+      @Override
+      public Explanation toExplanation(StreamFactory factory) throws IOException {
+        return new StreamExplanation(getStreamNodeId().toString())
+            .withFunctionName("Dummy")
+            .withImplementingClass(this.getClass().getName())
+            .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
+            .withExpression("--non-expressible--");
+      }
+    };
+
+    byte[] bytes = serialize(tupleStream);
+    JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), true);
+    Map m = parser.next();
+    assertEquals(1L, m.get("id"));
+    assertEquals(1.0, (Double) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(2L, m.get("id"));
+    assertEquals(2.0, (Double) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(3L, m.get("id"));
+    assertEquals(1.0, (Double) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(Boolean.TRUE, m.get("EOF"));
+
+    parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), false);
+    m = parser.next();
+    assertEquals(1, m.get("id"));
+    assertEquals(1.0, (Float) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(2, m.get("id"));
+    assertEquals(2.0, (Float) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(3, m.get("id"));
+    assertEquals(1.0, (Float) m.get("f"), 0.01);
+    m = parser.next();
+    assertEquals(Boolean.TRUE, m.get("EOF"));
+  }
+
+  public void testSolrDocumentList() throws IOException {
+    SolrQueryResponse response = new SolrQueryResponse();
+    SolrDocumentList l = constructSolrDocList(response);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    new JavaBinCodec().marshal(response.getValues(), baos);
+    byte[] bytes = serialize(response.getValues());
+    Object o = new JavaBinCodec().unmarshal(new ByteArrayInputStream(bytes));
+    List list = new ArrayList<>();
+    Map m = null;
+    JavabinTupleStreamParser parser = new JavabinTupleStreamParser(new ByteArrayInputStream(bytes), false);
+    while ((m = parser.next()) != null) {
+      list.add(m);
+    }
+    assertEquals(l.size(), list.size());
+    for(int i =0;i<list.size();i++){
+      compareSolrDocument(l.get(i),new SolrDocument((Map<String, Object>) list.get(i)));
+    }
+
+  }
+  public static byte[] serialize(Object o) throws IOException {
+    SolrQueryResponse response = new SolrQueryResponse();
+    response.getValues().add("results", o);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    new JavaBinCodec().marshal(response.getValues(), baos);
+    return baos.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java
new file mode 100644
index 0000000..dfe8cc7
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JavabinTupleStreamParser.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.io.stream;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.util.DataInputInputStream;
+import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+
+public class JavabinTupleStreamParser extends JavaBinCodec implements TupleStreamParser {
+  private final InputStream is;
+  final FastInputStream fis;
+  private int arraySize = Integer.MAX_VALUE;
+  private boolean onlyJsonTypes = false;
+  int objectSize;
+
+
+  public JavabinTupleStreamParser(InputStream is, boolean onlyJsonTypes) throws IOException {
+    this.onlyJsonTypes = onlyJsonTypes;
+    this.is = is;
+    this.fis = initRead(is);
+    if (!readTillDocs()) arraySize = 0;
+  }
+
+
+  private boolean readTillDocs() throws IOException {
+    if (isObjectType(fis)) {
+      if (tagByte == SOLRDOCLST) {
+        readVal(fis);// this is the metadata, throw it away
+        tagByte = fis.readByte();
+        arraySize = readSize(fis);
+        return true;
+      }
+      for (int i = objectSize; i > 0; i--) {
+        Object k = readVal(fis);
+        if (k == END_OBJ) break;
+        if ("docs".equals(k)) {
+          tagByte = fis.readByte();
+          if (tagByte == ITERATOR) return true;//docs must be an iterator or
+          if (tagByte >>> 5 == ARR >>> 5) {// an array
+            arraySize = readSize(fis);
+            return true;
+          }
+          return false;
+        } else {
+          if (readTillDocs()) return true;
+        }
+      }
+    } else {
+      readObject(fis);
+      return false;
+    }
+    return false;
+
+    //here after it will be a stream of maps
+  }
+
+  private boolean isObjectType(DataInputInputStream dis) throws IOException {
+    tagByte = dis.readByte();
+    if (tagByte >>> 5 == ORDERED_MAP >>> 5 ||
+        tagByte >>> 5 == NAMED_LST >>> 5) {
+      objectSize = readSize(dis);
+      return true;
+    }
+    if (tagByte == MAP) {
+      objectSize = readVInt(dis);
+      return true;
+    }
+    if (tagByte == MAP_ENTRY_ITER) {
+      objectSize = Integer.MAX_VALUE;
+      return true;
+    }
+    return tagByte == SOLRDOCLST;
+  }
+
+  private Map readAsMap(DataInputInputStream dis) throws IOException {
+    int sz = readSize(dis);
+    Map m = new LinkedHashMap<>();
+    for (int i = 0; i < sz; i++) {
+      String name = (String) readVal(dis);
+      Object val = readVal(dis);
+      m.put(name, val);
+    }
+    return m;
+  }
+
+  private Map readSolrDocumentAsMap(DataInputInputStream dis) throws IOException {
+    tagByte = dis.readByte();
+    int size = readSize(dis);
+    Map doc = new LinkedHashMap<>();
+    for (int i = 0; i < size; i++) {
+      String fieldName;
+      Object obj = readVal(dis); // could be a field name, or a child document
+      if (obj instanceof Map) {
+        List l = (List) doc.get("_childDocuments_");
+        if (l == null) doc.put("_childDocuments_", l = new ArrayList());
+        l.add(obj);
+        continue;
+      } else {
+        fieldName = (String) obj;
+      }
+      Object fieldVal = readVal(dis);
+      doc.put(fieldName, fieldVal);
+    }
+    return doc;
+  }
+
+  @Override
+  protected Object readObject(DataInputInputStream dis) throws IOException {
+    if (tagByte == SOLRDOC) {
+      return readSolrDocumentAsMap(dis);
+    }
+    if (onlyJsonTypes) {
+      switch (tagByte >>> 5) {
+        case SINT >>> 5:
+          int i = readSmallInt(dis);
+          return (long) i;
+        case ORDERED_MAP >>> 5:
+          return readAsMap(dis);
+        case NAMED_LST >>> 5:
+          return readAsMap(dis);
+      }
+
+      switch (tagByte) {
+        case INT: {
+          int i = dis.readInt();
+          return (long) i;
+        }
+        case FLOAT: {
+          float v = dis.readFloat();
+          return (double) v;
+        }
+        case BYTE: {
+          byte b = dis.readByte();
+          return (long) b;
+        }
+        case SHORT: {
+          short s = dis.readShort();
+          return (long) s;
+        }
+
+        case DATE: {
+          return Instant.ofEpochMilli(dis.readLong()).toString();
+        }
+
+        default:
+          return super.readObject(dis);
+      }
+    } else return super.readObject(dis);
+  }
+
+
+  @Override
+  public Map<String, Object> next() throws IOException {
+    if (arraySize == 0) return null;
+    Object o = readVal(fis);
+    arraySize--;
+    if (o == END_OBJ) return null;
+    return (Map<String, Object>) o;
+  }
+
+  @Override
+  public void close() throws IOException {
+    is.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 6a21703..f132815 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -274,15 +274,18 @@ public class SolrStream extends TupleStream {
     }
 
     String wt = requestParams.get(CommonParams.WT, "json");
-    assert CommonParams.JSON.equals(wt);
     QueryRequest query = new QueryRequest(requestParams);
     query.setPath(p);
     query.setResponseParser(new InputStreamResponseParser(wt));
     query.setMethod(SolrRequest.METHOD.POST);
     NamedList<Object> genericResponse = server.request(query);
     InputStream stream = (InputStream) genericResponse.get("stream");
-    InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
-    return new JSONTupleStream(reader);
+    if (CommonParams.JAVABIN.equals(wt)) {
+      return new JavabinTupleStreamParser(stream, true);
+    } else {
+      InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
+      return new JSONTupleStream(reader);
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
index b4af06f..2e69b82 100644
--- a/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
+++ b/solr/solrj/src/java/org/apache/solr/common/SolrDocument.java
@@ -50,6 +50,10 @@ public class SolrDocument extends SolrDocumentBase<Object, SolrDocument> impleme
     _fields = new LinkedHashMap<>();
   }
 
+  public SolrDocument(Map<String, Object> fields) {
+    this._fields = fields;
+  }
+
   /**
    * @return a list of field names defined in this document - this Collection is directly backed by this SolrDocument.
    * @see #keySet

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
index 32ccf36..3e054d7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
@@ -169,6 +169,11 @@ public class JavaBinCodec implements PushWriter {
   byte version;
 
   public Object unmarshal(InputStream is) throws IOException {
+    FastInputStream dis = initRead(is);
+    return readVal(dis);
+  }
+
+  protected FastInputStream initRead(InputStream is) throws IOException {
     assert !alreadyUnmarshalled;
     FastInputStream dis = FastInputStream.wrap(is);
     version = dis.readByte();
@@ -176,9 +181,9 @@ public class JavaBinCodec implements PushWriter {
       throw new RuntimeException("Invalid version (expected " + VERSION +
           ", but " + version + ") or the data in not in 'javabin' format");
     }
-    
+
     alreadyUnmarshalled = true;
-    return readVal(dis);
+    return dis;
   }
 
 
@@ -243,7 +248,10 @@ public class JavaBinCodec implements PushWriter {
 
   public Object readVal(DataInputInputStream dis) throws IOException {
     tagByte = dis.readByte();
+    return readObject(dis);
+  }
 
+  protected Object readObject(DataInputInputStream dis) throws IOException {
     // if ((tagByte & 0xe0) == 0) {
     // if top 3 bits are clear, this is a normal tag
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/28fd42bf/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 99d810c..f84d6a8 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -1977,7 +1977,7 @@ public void testTrace() throws Exception {
     for (int idx = 0; idx < vals.length; idx += 2) {
       params.add(vals[idx], vals[idx + 1]);
     }
-
+    if(random().nextBoolean()) params.add("wt","javabin");
     return params;
   }