You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by michalsenkyr <gi...@git.apache.org> on 2017/02/02 22:53:44 UTC

[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

Github user michalsenkyr commented on the issue:

    https://github.com/apache/spark/pull/16541
  
    Apologies for taking so long.
    
    I tried modifying the serialization logic as best as I could to serialize into `UnsafeArrayData` ([branch diff](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)). I had to first convert into an array to use `fromPrimitiveArray` on the result. That's probably the reason why the benchmark came up slightly worse:
    
    ```
    OpenJDK 64-Bit Server VM 1.8.0_121-b13 on Linux 4.9.6-1-ARCH
    AMD A10-4600M APU with Radeon(tm) HD Graphics
    collect:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    Seq                                            256 /  287          0,0      255670,1       1,0X
    List                                           161 /  220          0,0      161091,7       1,6X
    mutable.Queue                                  304 /  324          0,0      303823,3       0,8X
    ```
    
    I am not entirely sure how `GenericArrayData` and `UnsafeArrayData` is handled on transformations and shuffles though, so it's possible that more complex tests will reveal better performance. However, I'm not sure that I can test this properly on my single-machine setup. I'd definitely be interested in benchmark results on a cluster setup.
    
    Generated code:
    
    ```
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private boolean CollectObjects_loopIsNull1;
    /* 010 */   private int CollectObjects_loopValue0;
    /* 011 */   private UnsafeRow deserializetoobject_result;
    /* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
    /* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
    /* 014 */   private scala.collection.immutable.List mapelements_argValue;
    /* 015 */   private UnsafeRow mapelements_result;
    /* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
    /* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
    /* 018 */   private scala.collection.immutable.List serializefromobject_argValue;
    /* 019 */   private UnsafeRow serializefromobject_result;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
    /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
    /* 023 */
    /* 024 */   public GeneratedIterator(Object[] references) {
    /* 025 */     this.references = references;
    /* 026 */   }
    /* 027 */
    /* 028 */   public void init(int index, scala.collection.Iterator[] inputs) {
    /* 029 */     partitionIndex = index;
    /* 030 */     this.inputs = inputs;
    /* 031 */     inputadapter_input = inputs[0];
    /* 032 */
    /* 033 */     deserializetoobject_result = new UnsafeRow(1);
    /* 034 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
    /* 035 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
    /* 036 */
    /* 037 */     mapelements_result = new UnsafeRow(1);
    /* 038 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
    /* 039 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
    /* 040 */
    /* 041 */     serializefromobject_result = new UnsafeRow(1);
    /* 042 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
    /* 043 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
    /* 044 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
    /* 045 */
    /* 046 */   }
    /* 047 */
    /* 048 */   protected void processNext() throws java.io.IOException {
    /* 049 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 050 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
    /* 051 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 052 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
    /* 053 */
    /* 054 */       scala.collection.immutable.List deserializetoobject_value = null;
    /* 055 */
    /* 056 */       if (!inputadapter_isNull) {
    /* 057 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
    /* 058 */         scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder();
    /* 059 */         CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength);
    /* 060 */
    /* 061 */         int deserializetoobject_loopIndex = 0;
    /* 062 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
    /* 063 */           CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex));
    /* 064 */           CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
    /* 065 */
    /* 066 */           if (CollectObjects_loopIsNull1) {
    /* 067 */             throw new RuntimeException(((java.lang.String) references[0]));
    /* 068 */           }
    /* 069 */           if (false) {
    /* 070 */             CollectObjects_builderValue2.$plus$eq(null);
    /* 071 */           } else {
    /* 072 */             CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0);
    /* 073 */           }
    /* 074 */
    /* 075 */           deserializetoobject_loopIndex += 1;
    /* 076 */         }
    /* 077 */
    /* 078 */         deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result();
    /* 079 */       }
    /* 080 */
    /* 081 */       boolean mapelements_isNull = true;
    /* 082 */       scala.collection.immutable.List mapelements_value = null;
    /* 083 */       if (!false) {
    /* 084 */         mapelements_argValue = deserializetoobject_value;
    /* 085 */
    /* 086 */         mapelements_isNull = false;
    /* 087 */         if (!mapelements_isNull) {
    /* 088 */           Object mapelements_funcResult = null;
    /* 089 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
    /* 090 */           if (mapelements_funcResult == null) {
    /* 091 */             mapelements_isNull = true;
    /* 092 */           } else {
    /* 093 */             mapelements_value = (scala.collection.immutable.List) mapelements_funcResult;
    /* 094 */           }
    /* 095 */
    /* 096 */         }
    /* 097 */         mapelements_isNull = mapelements_value == null;
    /* 098 */       }
    /* 099 */
    /* 100 */       if (mapelements_isNull) {
    /* 101 */         throw new RuntimeException(((java.lang.String) references[2]));
    /* 102 */       }
    /* 103 */       serializefromobject_argValue = mapelements_value;
    /* 104 */
    /* 105 */       boolean serializefromobject_isNull = false;
    /* 106 */       final ArrayData serializefromobject_value = false ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromIntegerSeq(serializefromobject_argValue);
    /* 107 */       serializefromobject_isNull = serializefromobject_value == null;
    /* 108 */       serializefromobject_holder.reset();
    /* 109 */
    /* 110 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 111 */
    /* 112 */       if (serializefromobject_isNull) {
    /* 113 */         serializefromobject_rowWriter.setNullAt(0);
    /* 114 */       } else {
    /* 115 */         // Remember the current cursor so that we can calculate how many bytes are
    /* 116 */         // written later.
    /* 117 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
    /* 118 */
    /* 119 */         if (serializefromobject_value instanceof UnsafeArrayData) {
    /* 120 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
    /* 121 */           // grow the global buffer before writing data.
    /* 122 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
    /* 123 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
    /* 124 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
    /* 125 */
    /* 126 */         } else {
    /* 127 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
    /* 128 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
    /* 129 */
    /* 130 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
    /* 131 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
    /* 132 */               serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
    /* 133 */             } else {
    /* 134 */               final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
    /* 135 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
    /* 136 */             }
    /* 137 */           }
    /* 138 */         }
    /* 139 */
    /* 140 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
    /* 141 */       }
    /* 142 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
    /* 143 */       append(serializefromobject_result);
    /* 144 */       if (shouldStop()) return;
    /* 145 */     }
    /* 146 */   }
    /* 147 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org