You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemml.apache.org by Anthony Thomas <ah...@eng.ucsd.edu> on 2017/12/21 23:00:05 UTC

Passing a CoordinateMatrix to SystemML

Hi SystemML folks,

I'm trying to pass some data from Spark to a DML script via the MLContext
API. The data is derived from a parquet file containing a dataframe with
the schema: [label: Integer, features: SparseVector]. I am doing the
following:

        val input_data = spark.read.parquet(inputPath)
        val x = input_data.select("features")
        val y = input_data.select("y")
        val x_meta = new MatrixMetadata(DF_VECTOR)
        val y_meta = new MatrixMetadata(DF_DOUBLES)
        val script = dmlFromFile(s"${script_path}/script.dml").
                in("X", x, x_meta).
                in("Y", y, y_meta)
        ...

However, this results in an error from SystemML:
java.lang.ArrayIndexOutOfBoundsException: 0
I'm guessing this has something to do with SparkML being zero indexed and
SystemML being 1 indexed. Is there something I should be doing differently
here? Note that I also tried converting the dataframe to a CoordinateMatrix
and then creating an RDD[String] in IJV format. That too resulted in
"ArrayIndexOutOfBoundsExceptions." I'm guessing there's something simple
I'm doing wrong here, but I haven't been able to figure out exactly what.
Please let me know if you need more information (I can send along the full
error stacktrace if that would be helpful)!

Thanks,

Anthony

Re: Passing a CoordinateMatrix to SystemML

Posted by Anthony Thomas <ah...@eng.ucsd.edu>.
Hey Matthias,

Just wanted to confirm that patch above works for me - I'm now able to pass
a dataframe of sparse vectors to a DML script without issue. Sorry for the
slow confirmation on this - I've been out of the office for the last couple
weeks. Thanks for your help debugging this!

Best,

Anthony

On Mon, Dec 25, 2017 at 5:35 AM, Matthias Boehm <mb...@gmail.com> wrote:

> ok that was very helpful - I just pushed two additional fixes which should
> resolve these issues. The underlying cause was an incorrect sparse row
> preallocation (to reduce GC overhead), which resulted in resizing issues
> for initial sizes of zero. These two patches fix the underlying issues,
> make both MCSR and COO more robust for such ultra-sparse cases, and improve
> the performance for converting ultra-sparse matrices. Thanks again for your
> help Anthony.
>
> As a side note: our default block size is 1000 but converting to 1024 is
> fine if you also set 'sysml.defaultblocksize' to 1024; otherwise there will
> be an unnecessary reblock (with shuffle) from block size 1024 to 1000 on
> the first access of this input.
>
> Regards,
> Matthias
>
>
> On 12/25/2017 3:07 AM, Anthony Thomas wrote:
>
>> Thanks Matthias - unfortunately I'm still running into an
>> ArrayIndexOutOfBounds exception both in reading the file as IJV and when
>> calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled
>> the latest version using:
>>
>> git clone https://github.com/apache/systemml
>> cd systemml
>> mvn clean package
>>
>> mvn -version
>> Apache Maven 3.3.9
>> Maven home: /usr/share/maven
>> Java version: 1.8.0_151, vendor: Oracle Corporation
>> Java home: /usr/lib/jvm/java-8-oracle/jre
>> Default locale: en_US, platform encoding: UTF-8
>> OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family:
>> "unix"
>>
>> I have a simple driver script written in Scala which calls the API
>> methods.
>> I compile the script using SBT (version 1.0.4) and submit using
>> spark-submit (spark version 2.2.0). Here's how I'm calling the methods:
>>
>>         val x = spark.read.parquet(inputPath).select(featureNames)
>>         val mc = new MatrixCharacteristics(199563535L, 71403L, 1024,
>> 1024,
>> 2444225947L) // as far as I know 1024x1024 is default block size in sysml?
>>         println("Reading Direct")
>>         val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc,
>> false, true)
>>         xrdd.count
>>
>> here is the stacktrace from calling dataFrameToBinaryBlock:
>>
>>  java.lang.ArrayIndexOutOfBoundsException: 0
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseRowVector.append(
>> SparseRowVector.java:196)
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(
>> SparseBlockMCSR.java:267)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue
>> (MatrixBlock.java:685)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(
>> JavaRDDLike.scala:186)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(
>> JavaRDDLike.scala:186)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:797)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:797)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 323)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> and here is the stacktrace from calling "read()" directly:
>>
>> java.lang.ArrayIndexOutOfBoundsException: 2
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(
>> SparseBlockCOO.java:399)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>> arse(MatrixBlock.java:1784)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>> xBlock.java:1687)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>         at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>> on2$1.apply(JavaPairRDD.scala:1037)
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>> apply(ExternalSorter.scala:189)
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>> apply(ExternalSorter.scala:188)
>>         at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>> AppendOnlyMap.scala:150)
>>         at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>         at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>> ExternalSorter.scala:194)
>>         at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>> uffleWriter.scala:63)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Best,
>>
>> Anthony
>>
>>
>> On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mb...@gmail.com>
>> wrote:
>>
>> Thanks again for catching this issue Anthony - this IJV reblock issue with
>>> large ultra-sparse matrices is now fixed in master. It likely did not
>>> show
>>> up on the 1% sample because the data was small enough to read it directly
>>> into the driver.
>>>
>>> However, the dataFrameToBinaryBlock might be another issue that I could
>>> not reproduce yet, so it would be very helpful if you could give it
>>> another
>>> try. Thanks.
>>>
>>> Regards,
>>> Matthias
>>>
>>>
>>> On 12/24/2017 9:57 AM, Matthias Boehm wrote:
>>>
>>> Hi Anthony,
>>>>
>>>> thanks for helping to debug this issue. There are no limits other than
>>>> the dimensions and number of non-zeros being of type long. It sounds
>>>> more like an issues of converting special cases of ultra-sparse
>>>> matrices. I'll try to reproduce this issue and give an update as soon as
>>>> I know more. In the meantime, could you please (a) also provide the
>>>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
>>>> try calling your IJV conversion script via spark submit to exclude that
>>>> this issue is API-related? Thanks.
>>>>
>>>> Regards,
>>>> Matthias
>>>>
>>>> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>>>>
>>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>>>>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>>>>> success. I
>>>>> additionally wrote out the matrix to hdfs in IJV format and am still
>>>>> getting the same error when calling "read()" directly in the DML.
>>>>> However,
>>>>> I created a 1% sample of the original data in IJV format and SystemML
>>>>> was
>>>>> able to read the smaller file without any issue. This would seem to
>>>>> suggest
>>>>> that either there is some corruption in the full file or I'm running
>>>>> into
>>>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols
>>>>> with
>>>>> 2.4e9 nonzero values, but this seems like it should be well within the
>>>>> limits of what SystemML/Spark can handle. I also checked for obvious
>>>>> data
>>>>> errors (file is not 1 indexed or contains blank lines). In case it's
>>>>> helpful, the stacktrace from reading the data from hdfs in IJV format
>>>>> is
>>>>> attached. Thanks again for your help - I really appreciate it.
>>>>>
>>>>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>>>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>>>>         at java.lang.System.arraycopy(Native Method)
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig
>>>>> htByN(SparseBlockCOO.java:594)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(
>>>>> SparseBlockCOO.java:323)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>>>>> arse(MatrixBlock.java:1790)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>>>>> xBlock.java:1736)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>>>>
>>>>>         at
>>>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>>>>> on2$1.apply(JavaPairRDD.scala:1037)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>>> apply(ExternalSorter.scala:189)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>>> apply(ExternalSorter.scala:188)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>>>>> AppendOnlyMap.scala:150)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>>>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>>>>> ExternalSorter.scala:194)
>>>>>
>>>>>         at
>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>>>>> uffleWriter.scala:63)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>>> Task.scala:96)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>>> Task.scala:53)
>>>>>
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>>> Executor.java:1149)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>>> lExecutor.java:624)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> Anthony
>>>>>
>>>>>
>>>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Given the line numbers from the stacktrace, it seems that you use a
>>>>>
>>>>>> rather
>>>>>> old version of SystemML. Hence, I would recommend to upgrade to
>>>>>> SystemML
>>>>>> 1.0 or at least 0.15 first.
>>>>>>
>>>>>> If the error persists or you're not able to upgrade, please try to
>>>>>> call
>>>>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>>>>> dimensions
>>>>>> and blocksizes. The issue you've shown usually originates from
>>>>>> incorrect
>>>>>> meta data (e.g., negative number of columns or block sizes), which
>>>>>> prevents
>>>>>> the sparse rows from growing to the necessary sizes.
>>>>>>
>>>>>> Regards,
>>>>>> Matthias
>>>>>>
>>>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>>>
>>>>>>> Thanks for the help! In response to your questions:
>>>>>>>
>>>>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>>>>> features:
>>>>>>>    vector] - the column "features" was created using Spark's
>>>>>>> VectorAssembler
>>>>>>>    and the underlying type is an
>>>>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>>>>    Calling x.schema results in: org.apache.spark.sql.types.Str
>>>>>>> uctType
>>>>>>> =
>>>>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>>>>> script
>>>>>>>    still crashes when running "print(sum(X))". The full stack trace
>>>>>>> is
>>>>>>>    attached at the end of the message.
>>>>>>>    3. Unfortunately, the error persists when calling
>>>>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>>>>    4. Also just in case this matters: I'm packaging the script into
>>>>>>> a jar
>>>>>>>
>>>>>>>    using SBT assembly and submitting via spark-submit.
>>>>>>>
>>>>>>> Here's an updated script:
>>>>>>>
>>>>>>>         val input_df = spark.read.parquet(inputPath)
>>>>>>>         val x = input_df.select(featureNames)
>>>>>>>         val y = input_df.select("y")
>>>>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>>>>
>>>>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>>>>         println("Reading X")
>>>>>>>         val res_x = ml.execute(script_x)
>>>>>>>
>>>>>>> Here is the output of the runtime plan generated by SystemML:
>>>>>>>
>>>>>>> # EXPLAIN (RUNTIME):
>>>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>>>>> PROGRAM ( size CP/SP = 3/0 )
>>>>>>> --MAIN PROGRAM
>>>>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>>>>> ------CP rmvar _Var0 _Var1
>>>>>>>
>>>>>>> And the resulting stack trace:
>>>>>>>
>>>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>>>>> (TID 205,
>>>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException:
>>>>>>> 0
>>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>>>>> SparseRow.java:215)
>>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>>>>> append(SparseBlockMCSR.java:253)
>>>>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>>>>> appendValue(MatrixBlock.java:663)
>>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>>> erUtils$
>>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>>> erUtils$
>>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>>>>> MapPartitionsRDD.scala:38)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>>>>>>> 323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>>> ShuffleMapTask.scala:96)
>>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>>> ShuffleMapTask.scala:53)
>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>>>> scala:335)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>>>>> times; aborting job
>>>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>>>>> .MLContextException:
>>>>>>> Exception when executing script
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:311)
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:280)
>>>>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>>>>> ml_algorithms.scala:63)
>>>>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:
>>>>>>> 160)
>>>>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>>>>> SparkSubmit.scala:180)
>>>>>>>     at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>>>>>>> 119)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException:
>>>>>>> Exception
>>>>>>> occurred while executing runtime program
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>>> Program(
>>>>>>> ScriptExecutor.java:390)
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>>>>> execute(ScriptExecutor.java:298)
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:303)
>>>>>>>     ... 14 more
>>>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error
>>>>>>> in
>>>>>>> program block generated from statement block between lines 1 and 2 --
>>>>>>> Error
>>>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·
>>>>>>> SCALAR·STRING°24
>>>>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>>>>> execute(Program.java:130)
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>>> Program(
>>>>>>> ScriptExecutor.java:388)
>>>>>>>     ... 16 more
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> well, let's do the following to figure this out:
>>>>>>>
>>>>>>>
>>>>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>>>>> please change the third line to val y = input_data.select("label").
>>>>>>>>
>>>>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>>>>> "print(sum(X));" and try converting X and y separately to isolate
>>>>>>>> the
>>>>>>>> problem.
>>>>>>>>
>>>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's
>>>>>>>> an
>>>>>>>> issue of converting X, y, or both, as well as (b) the full
>>>>>>>> stacktrace.
>>>>>>>>
>>>>>>>> 4) As a workaround you might also call our internal converter
>>>>>>>> directly
>>>>>>>> via:
>>>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>>>>> isVector),
>>>>>>>> where jsc is the java spark context, df is the dataset, mc are
>>>>>>>> matrix
>>>>>>>> characteristics (if unknown, simply use new
>>>>>>>> MatrixCharacteristics()),
>>>>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>>>>> with the
>>>>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>>>>> vectors
>>>>>>>> or basic types such as double.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>>>>
>>>>>>>> Hi SystemML folks,
>>>>>>>>
>>>>>>>>
>>>>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>>>>> MLContext
>>>>>>>>> API. The data is derived from a parquet file containing a
>>>>>>>>> dataframe with
>>>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing
>>>>>>>>> the
>>>>>>>>> following:
>>>>>>>>>
>>>>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>>>>         val x = input_data.select("features")
>>>>>>>>>         val y = input_data.select("y")
>>>>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>>>>                 in("X", x, x_meta).
>>>>>>>>>                 in("Y", y, y_meta)
>>>>>>>>>         ...
>>>>>>>>>
>>>>>>>>> However, this results in an error from SystemML:
>>>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>>>> I'm guessing this has something to do with SparkML being zero
>>>>>>>>> indexed
>>>>>>>>> and
>>>>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>>>>> differently
>>>>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>>>>> CoordinateMatrix
>>>>>>>>> and then creating an RDD[String] in IJV format. That too resulted
>>>>>>>>> in
>>>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>>>>> simple
>>>>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>>>>> what.
>>>>>>>>> Please let me know if you need more information (I can send along
>>>>>>>>> the
>>>>>>>>> full
>>>>>>>>> error stacktrace if that would be helpful)!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Anthony
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>

Re: Passing a CoordinateMatrix to SystemML

Posted by Matthias Boehm <mb...@gmail.com>.
ok that was very helpful - I just pushed two additional fixes which 
should resolve these issues. The underlying cause was an incorrect 
sparse row preallocation (to reduce GC overhead), which resulted in 
resizing issues for initial sizes of zero. These two patches fix the 
underlying issues, make both MCSR and COO more robust for such 
ultra-sparse cases, and improve the performance for converting 
ultra-sparse matrices. Thanks again for your help Anthony.

As a side note: our default block size is 1000 but converting to 1024 is 
fine if you also set 'sysml.defaultblocksize' to 1024; otherwise there 
will be an unnecessary reblock (with shuffle) from block size 1024 to 
1000 on the first access of this input.

Regards,
Matthias

On 12/25/2017 3:07 AM, Anthony Thomas wrote:
> Thanks Matthias - unfortunately I'm still running into an
> ArrayIndexOutOfBounds exception both in reading the file as IJV and when
> calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled
> the latest version using:
>
> git clone https://github.com/apache/systemml
> cd systemml
> mvn clean package
>
> mvn -version
> Apache Maven 3.3.9
> Maven home: /usr/share/maven
> Java version: 1.8.0_151, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-oracle/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family: "unix"
>
> I have a simple driver script written in Scala which calls the API methods.
> I compile the script using SBT (version 1.0.4) and submit using
> spark-submit (spark version 2.2.0). Here's how I'm calling the methods:
>
>         val x = spark.read.parquet(inputPath).select(featureNames)
>         val mc = new MatrixCharacteristics(199563535L, 71403L, 1024, 1024,
> 2444225947L) // as far as I know 1024x1024 is default block size in sysml?
>         println("Reading Direct")
>         val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc,
> false, true)
>         xrdd.count
>
> here is the stacktrace from calling dataFrameToBinaryBlock:
>
>  java.lang.ArrayIndexOutOfBoundsException: 0
>         at
> org.apache.sysml.runtime.matrix.data.SparseRowVector.append(SparseRowVector.java:196)
>         at
> org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(SparseBlockMCSR.java:267)
>         at
> org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue(MatrixBlock.java:685)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999)
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
> and here is the stacktrace from calling "read()" directly:
>
> java.lang.ArrayIndexOutOfBoundsException: 2
>         at
> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(SparseBlockCOO.java:399)
>         at
> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1784)
>         at
> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1687)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>         at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
>         at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
>         at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
>         at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>         at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>         at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
>         at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
> Best,
>
> Anthony
>
>
> On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mb...@gmail.com> wrote:
>
>> Thanks again for catching this issue Anthony - this IJV reblock issue with
>> large ultra-sparse matrices is now fixed in master. It likely did not show
>> up on the 1% sample because the data was small enough to read it directly
>> into the driver.
>>
>> However, the dataFrameToBinaryBlock might be another issue that I could
>> not reproduce yet, so it would be very helpful if you could give it another
>> try. Thanks.
>>
>> Regards,
>> Matthias
>>
>>
>> On 12/24/2017 9:57 AM, Matthias Boehm wrote:
>>
>>> Hi Anthony,
>>>
>>> thanks for helping to debug this issue. There are no limits other than
>>> the dimensions and number of non-zeros being of type long. It sounds
>>> more like an issues of converting special cases of ultra-sparse
>>> matrices. I'll try to reproduce this issue and give an update as soon as
>>> I know more. In the meantime, could you please (a) also provide the
>>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
>>> try calling your IJV conversion script via spark submit to exclude that
>>> this issue is API-related? Thanks.
>>>
>>> Regards,
>>> Matthias
>>>
>>> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>>>
>>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>>>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>>>> success. I
>>>> additionally wrote out the matrix to hdfs in IJV format and am still
>>>> getting the same error when calling "read()" directly in the DML.
>>>> However,
>>>> I created a 1% sample of the original data in IJV format and SystemML was
>>>> able to read the smaller file without any issue. This would seem to
>>>> suggest
>>>> that either there is some corruption in the full file or I'm running into
>>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
>>>> 2.4e9 nonzero values, but this seems like it should be well within the
>>>> limits of what SystemML/Spark can handle. I also checked for obvious data
>>>> errors (file is not 1 indexed or contains blank lines). In case it's
>>>> helpful, the stacktrace from reading the data from hdfs in IJV format is
>>>> attached. Thanks again for your help - I really appreciate it.
>>>>
>>>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>>>         at java.lang.System.arraycopy(Native Method)
>>>>         at
>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig
>>>> htByN(SparseBlockCOO.java:594)
>>>>
>>>>         at
>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(
>>>> SparseBlockCOO.java:323)
>>>>
>>>>         at
>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>>>> arse(MatrixBlock.java:1790)
>>>>
>>>>         at
>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>>>> xBlock.java:1736)
>>>>
>>>>         at
>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>>>
>>>>         at
>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>>>
>>>>         at
>>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>>>> on2$1.apply(JavaPairRDD.scala:1037)
>>>>
>>>>         at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>> apply(ExternalSorter.scala:189)
>>>>
>>>>         at
>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>> apply(ExternalSorter.scala:188)
>>>>
>>>>         at
>>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>>>> AppendOnlyMap.scala:150)
>>>>
>>>>         at
>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>>>
>>>>         at
>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>>>> ExternalSorter.scala:194)
>>>>
>>>>         at
>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>>>> uffleWriter.scala:63)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>> Task.scala:96)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>> Task.scala:53)
>>>>
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1149)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:624)
>>>>
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Anthony
>>>>
>>>>
>>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com>
>>>> wrote:
>>>>
>>>> Given the line numbers from the stacktrace, it seems that you use a
>>>>> rather
>>>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>>>>> 1.0 or at least 0.15 first.
>>>>>
>>>>> If the error persists or you're not able to upgrade, please try to call
>>>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>>>> dimensions
>>>>> and blocksizes. The issue you've shown usually originates from incorrect
>>>>> meta data (e.g., negative number of columns or block sizes), which
>>>>> prevents
>>>>> the sparse rows from growing to the necessary sizes.
>>>>>
>>>>> Regards,
>>>>> Matthias
>>>>>
>>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>>>
>>>>> Hi Matthias,
>>>>>>
>>>>>> Thanks for the help! In response to your questions:
>>>>>>
>>>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>>>> features:
>>>>>>    vector] - the column "features" was created using Spark's
>>>>>> VectorAssembler
>>>>>>    and the underlying type is an
>>>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType
>>>>>> =
>>>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>>>> script
>>>>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>>>>    attached at the end of the message.
>>>>>>    3. Unfortunately, the error persists when calling
>>>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>>>    4. Also just in case this matters: I'm packaging the script into
>>>>>> a jar
>>>>>>
>>>>>>    using SBT assembly and submitting via spark-submit.
>>>>>>
>>>>>> Here's an updated script:
>>>>>>
>>>>>>         val input_df = spark.read.parquet(inputPath)
>>>>>>         val x = input_df.select(featureNames)
>>>>>>         val y = input_df.select("y")
>>>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>>>
>>>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>>>         println("Reading X")
>>>>>>         val res_x = ml.execute(script_x)
>>>>>>
>>>>>> Here is the output of the runtime plan generated by SystemML:
>>>>>>
>>>>>> # EXPLAIN (RUNTIME):
>>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>>>> PROGRAM ( size CP/SP = 3/0 )
>>>>>> --MAIN PROGRAM
>>>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>>>> ------CP rmvar _Var0 _Var1
>>>>>>
>>>>>> And the resulting stack trace:
>>>>>>
>>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>>>> (TID 205,
>>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>>>> SparseRow.java:215)
>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>>>> append(SparseBlockMCSR.java:253)
>>>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>>>> appendValue(MatrixBlock.java:663)
>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>> erUtils$
>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>> erUtils$
>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>>>> MapPartitionsRDD.scala:38)
>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>> ShuffleMapTask.scala:96)
>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>> ShuffleMapTask.scala:53)
>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>>> scala:335)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>> ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>>>> times; aborting job
>>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>>>> .MLContextException:
>>>>>> Exception when executing script
>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>> java:311)
>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>> java:280)
>>>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>>>> ml_algorithms.scala:63)
>>>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:
>>>>>> 160)
>>>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>>>> SparkSubmit.scala:180)
>>>>>>     at
>>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>>>>>> 119)
>>>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException:
>>>>>> Exception
>>>>>> occurred while executing runtime program
>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>> Program(
>>>>>> ScriptExecutor.java:390)
>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>>>> execute(ScriptExecutor.java:298)
>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>> java:303)
>>>>>>     ... 14 more
>>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>>>>> program block generated from statement block between lines 1 and 2 --
>>>>>> Error
>>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>>>> execute(Program.java:130)
>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>> Program(
>>>>>> ScriptExecutor.java:388)
>>>>>>     ... 16 more
>>>>>> ...
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> well, let's do the following to figure this out:
>>>>>>
>>>>>>>
>>>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>>>> please change the third line to val y = input_data.select("label").
>>>>>>>
>>>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>>>>> problem.
>>>>>>>
>>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>>>>
>>>>>>> 4) As a workaround you might also call our internal converter directly
>>>>>>> via:
>>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>>>> isVector),
>>>>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>>>> with the
>>>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>>>> vectors
>>>>>>> or basic types such as double.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>>>
>>>>>>> Hi SystemML folks,
>>>>>>>
>>>>>>>>
>>>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>>>> MLContext
>>>>>>>> API. The data is derived from a parquet file containing a
>>>>>>>> dataframe with
>>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>>>>>> following:
>>>>>>>>
>>>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>>>         val x = input_data.select("features")
>>>>>>>>         val y = input_data.select("y")
>>>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>>>                 in("X", x, x_meta).
>>>>>>>>                 in("Y", y, y_meta)
>>>>>>>>         ...
>>>>>>>>
>>>>>>>> However, this results in an error from SystemML:
>>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>>>>> and
>>>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>>>> differently
>>>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>>>> CoordinateMatrix
>>>>>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>>>> simple
>>>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>>>> what.
>>>>>>>> Please let me know if you need more information (I can send along the
>>>>>>>> full
>>>>>>>> error stacktrace if that would be helpful)!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Anthony
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>

Re: Passing a CoordinateMatrix to SystemML

Posted by Anthony Thomas <ah...@eng.ucsd.edu>.
Thanks Matthias - unfortunately I'm still running into an
ArrayIndexOutOfBounds exception both in reading the file as IJV and when
calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled
the latest version using:

git clone https://github.com/apache/systemml
cd systemml
mvn clean package

mvn -version
Apache Maven 3.3.9
Maven home: /usr/share/maven
Java version: 1.8.0_151, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-oracle/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family: "unix"

I have a simple driver script written in Scala which calls the API methods.
I compile the script using SBT (version 1.0.4) and submit using
spark-submit (spark version 2.2.0). Here's how I'm calling the methods:

        val x = spark.read.parquet(inputPath).select(featureNames)
        val mc = new MatrixCharacteristics(199563535L, 71403L, 1024, 1024,
2444225947L) // as far as I know 1024x1024 is default block size in sysml?
        println("Reading Direct")
        val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc,
false, true)
        xrdd.count

here is the stacktrace from calling dataFrameToBinaryBlock:

 java.lang.ArrayIndexOutOfBoundsException: 0
        at
org.apache.sysml.runtime.matrix.data.SparseRowVector.append(SparseRowVector.java:196)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(SparseBlockMCSR.java:267)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue(MatrixBlock.java:685)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

and here is the stacktrace from calling "read()" directly:

java.lang.ArrayIndexOutOfBoundsException: 2
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(SparseBlockCOO.java:399)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1784)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1687)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
        at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
        at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Best,

Anthony


On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mb...@gmail.com> wrote:

> Thanks again for catching this issue Anthony - this IJV reblock issue with
> large ultra-sparse matrices is now fixed in master. It likely did not show
> up on the 1% sample because the data was small enough to read it directly
> into the driver.
>
> However, the dataFrameToBinaryBlock might be another issue that I could
> not reproduce yet, so it would be very helpful if you could give it another
> try. Thanks.
>
> Regards,
> Matthias
>
>
> On 12/24/2017 9:57 AM, Matthias Boehm wrote:
>
>> Hi Anthony,
>>
>> thanks for helping to debug this issue. There are no limits other than
>> the dimensions and number of non-zeros being of type long. It sounds
>> more like an issues of converting special cases of ultra-sparse
>> matrices. I'll try to reproduce this issue and give an update as soon as
>> I know more. In the meantime, could you please (a) also provide the
>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
>> try calling your IJV conversion script via spark submit to exclude that
>> this issue is API-related? Thanks.
>>
>> Regards,
>> Matthias
>>
>> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>>
>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>>> success. I
>>> additionally wrote out the matrix to hdfs in IJV format and am still
>>> getting the same error when calling "read()" directly in the DML.
>>> However,
>>> I created a 1% sample of the original data in IJV format and SystemML was
>>> able to read the smaller file without any issue. This would seem to
>>> suggest
>>> that either there is some corruption in the full file or I'm running into
>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
>>> 2.4e9 nonzero values, but this seems like it should be well within the
>>> limits of what SystemML/Spark can handle. I also checked for obvious data
>>> errors (file is not 1 indexed or contains blank lines). In case it's
>>> helpful, the stacktrace from reading the data from hdfs in IJV format is
>>> attached. Thanks again for your help - I really appreciate it.
>>>
>>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>>         at java.lang.System.arraycopy(Native Method)
>>>         at
>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig
>>> htByN(SparseBlockCOO.java:594)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(
>>> SparseBlockCOO.java:323)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>>> arse(MatrixBlock.java:1790)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>>> xBlock.java:1736)
>>>
>>>         at
>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>>
>>>         at
>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>>
>>>         at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>>> on2$1.apply(JavaPairRDD.scala:1037)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>> apply(ExternalSorter.scala:189)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>> apply(ExternalSorter.scala:188)
>>>
>>>         at
>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>>> AppendOnlyMap.scala:150)
>>>
>>>         at
>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>>> ExternalSorter.scala:194)
>>>
>>>         at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>>> uffleWriter.scala:63)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:96)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:53)
>>>
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1149)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:624)
>>>
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> Anthony
>>>
>>>
>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com>
>>> wrote:
>>>
>>> Given the line numbers from the stacktrace, it seems that you use a
>>>> rather
>>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>>>> 1.0 or at least 0.15 first.
>>>>
>>>> If the error persists or you're not able to upgrade, please try to call
>>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>>> dimensions
>>>> and blocksizes. The issue you've shown usually originates from incorrect
>>>> meta data (e.g., negative number of columns or block sizes), which
>>>> prevents
>>>> the sparse rows from growing to the necessary sizes.
>>>>
>>>> Regards,
>>>> Matthias
>>>>
>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>>
>>>> Hi Matthias,
>>>>>
>>>>> Thanks for the help! In response to your questions:
>>>>>
>>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>>> features:
>>>>>    vector] - the column "features" was created using Spark's
>>>>> VectorAssembler
>>>>>    and the underlying type is an
>>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType
>>>>> =
>>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>>> script
>>>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>>>    attached at the end of the message.
>>>>>    3. Unfortunately, the error persists when calling
>>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>>    4. Also just in case this matters: I'm packaging the script into
>>>>> a jar
>>>>>
>>>>>    using SBT assembly and submitting via spark-submit.
>>>>>
>>>>> Here's an updated script:
>>>>>
>>>>>         val input_df = spark.read.parquet(inputPath)
>>>>>         val x = input_df.select(featureNames)
>>>>>         val y = input_df.select("y")
>>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>>
>>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>>         println("Reading X")
>>>>>         val res_x = ml.execute(script_x)
>>>>>
>>>>> Here is the output of the runtime plan generated by SystemML:
>>>>>
>>>>> # EXPLAIN (RUNTIME):
>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>>> PROGRAM ( size CP/SP = 3/0 )
>>>>> --MAIN PROGRAM
>>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>>> ------CP rmvar _Var0 _Var1
>>>>>
>>>>> And the resulting stack trace:
>>>>>
>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>>> (TID 205,
>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>>> SparseRow.java:215)
>>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>>> append(SparseBlockMCSR.java:253)
>>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>>> appendValue(MatrixBlock.java:663)
>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>> erUtils$
>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>> erUtils$
>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>> apply(JavaRDDLike.scala:186)
>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>> apply(JavaRDDLike.scala:186)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>>> MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>> ShuffleMapTask.scala:96)
>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>> ShuffleMapTask.scala:53)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>> scala:335)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>>> times; aborting job
>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>>> .MLContextException:
>>>>> Exception when executing script
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:311)
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:280)
>>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>>> ml_algorithms.scala:63)
>>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:
>>>>> 160)
>>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>> NativeMethodAccessorImpl.java:62)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>>> SparkSubmit.scala:180)
>>>>>     at
>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>>>>> 119)
>>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException:
>>>>> Exception
>>>>> occurred while executing runtime program
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>> Program(
>>>>> ScriptExecutor.java:390)
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>>> execute(ScriptExecutor.java:298)
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:303)
>>>>>     ... 14 more
>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>>>> program block generated from statement block between lines 1 and 2 --
>>>>> Error
>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>>> execute(Program.java:130)
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>> Program(
>>>>> ScriptExecutor.java:388)
>>>>>     ... 16 more
>>>>> ...
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> well, let's do the following to figure this out:
>>>>>
>>>>>>
>>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>>> please change the third line to val y = input_data.select("label").
>>>>>>
>>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>>>> problem.
>>>>>>
>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>>>
>>>>>> 4) As a workaround you might also call our internal converter directly
>>>>>> via:
>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>>> isVector),
>>>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>>> with the
>>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>>> vectors
>>>>>> or basic types such as double.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Matthias
>>>>>>
>>>>>>
>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>>
>>>>>> Hi SystemML folks,
>>>>>>
>>>>>>>
>>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>>> MLContext
>>>>>>> API. The data is derived from a parquet file containing a
>>>>>>> dataframe with
>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>>>>> following:
>>>>>>>
>>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>>         val x = input_data.select("features")
>>>>>>>         val y = input_data.select("y")
>>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>>                 in("X", x, x_meta).
>>>>>>>                 in("Y", y, y_meta)
>>>>>>>         ...
>>>>>>>
>>>>>>> However, this results in an error from SystemML:
>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>>>> and
>>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>>> differently
>>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>>> CoordinateMatrix
>>>>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>>> simple
>>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>>> what.
>>>>>>> Please let me know if you need more information (I can send along the
>>>>>>> full
>>>>>>> error stacktrace if that would be helpful)!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Anthony
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>

Re: Passing a CoordinateMatrix to SystemML

Posted by Matthias Boehm <mb...@gmail.com>.
Thanks again for catching this issue Anthony - this IJV reblock issue 
with large ultra-sparse matrices is now fixed in master. It likely did 
not show up on the 1% sample because the data was small enough to read 
it directly into the driver.

However, the dataFrameToBinaryBlock might be another issue that I could 
not reproduce yet, so it would be very helpful if you could give it 
another try. Thanks.

Regards,
Matthias

On 12/24/2017 9:57 AM, Matthias Boehm wrote:
> Hi Anthony,
>
> thanks for helping to debug this issue. There are no limits other than
> the dimensions and number of non-zeros being of type long. It sounds
> more like an issues of converting special cases of ultra-sparse
> matrices. I'll try to reproduce this issue and give an update as soon as
> I know more. In the meantime, could you please (a) also provide the
> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
> try calling your IJV conversion script via spark submit to exclude that
> this issue is API-related? Thanks.
>
> Regards,
> Matthias
>
> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>> success. I
>> additionally wrote out the matrix to hdfs in IJV format and am still
>> getting the same error when calling "read()" directly in the DML.
>> However,
>> I created a 1% sample of the original data in IJV format and SystemML was
>> able to read the smaller file without any issue. This would seem to
>> suggest
>> that either there is some corruption in the full file or I'm running into
>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
>> 2.4e9 nonzero values, but this seems like it should be well within the
>> limits of what SystemML/Spark can handle. I also checked for obvious data
>> errors (file is not 1 indexed or contains blank lines). In case it's
>> helpful, the stacktrace from reading the data from hdfs in IJV format is
>> attached. Thanks again for your help - I really appreciate it.
>>
>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>         at java.lang.System.arraycopy(Native Method)
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRightByN(SparseBlockCOO.java:594)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(SparseBlockCOO.java:323)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1790)
>>
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1736)
>>
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>
>>         at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
>>
>>         at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>>
>>         at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>>
>>         at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
>>
>>         at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Anthony
>>
>>
>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com>
>> wrote:
>>
>>> Given the line numbers from the stacktrace, it seems that you use a
>>> rather
>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>>> 1.0 or at least 0.15 first.
>>>
>>> If the error persists or you're not able to upgrade, please try to call
>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>> dimensions
>>> and blocksizes. The issue you've shown usually originates from incorrect
>>> meta data (e.g., negative number of columns or block sizes), which
>>> prevents
>>> the sparse rows from growing to the necessary sizes.
>>>
>>> Regards,
>>> Matthias
>>>
>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Thanks for the help! In response to your questions:
>>>>
>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>> features:
>>>>    vector] - the column "features" was created using Spark's
>>>> VectorAssembler
>>>>    and the underlying type is an
>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>> script
>>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>>    attached at the end of the message.
>>>>    3. Unfortunately, the error persists when calling
>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>    4. Also just in case this matters: I'm packaging the script into
>>>> a jar
>>>>
>>>>    using SBT assembly and submitting via spark-submit.
>>>>
>>>> Here's an updated script:
>>>>
>>>>         val input_df = spark.read.parquet(inputPath)
>>>>         val x = input_df.select(featureNames)
>>>>         val y = input_df.select("y")
>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>
>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>         println("Reading X")
>>>>         val res_x = ml.execute(script_x)
>>>>
>>>> Here is the output of the runtime plan generated by SystemML:
>>>>
>>>> # EXPLAIN (RUNTIME):
>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>> PROGRAM ( size CP/SP = 3/0 )
>>>> --MAIN PROGRAM
>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>> ------CP rmvar _Var0 _Var1
>>>>
>>>> And the resulting stack trace:
>>>>
>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>> (TID 205,
>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>> SparseRow.java:215)
>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>> append(SparseBlockMCSR.java:253)
>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>> appendValue(MatrixBlock.java:663)
>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>> erUtils$
>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>> erUtils$
>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>> apply(JavaRDDLike.scala:186)
>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>> apply(JavaRDDLike.scala:186)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>> MapPartitionsRDD.scala:38)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>> ShuffleMapTask.scala:96)
>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>> ShuffleMapTask.scala:53)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>> scala:335)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>> times; aborting job
>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>> .MLContextException:
>>>> Exception when executing script
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:311)
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:280)
>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>> ml_algorithms.scala:63)
>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>> NativeMethodAccessorImpl.java:62)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>> DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>> SparkSubmit.scala:180)
>>>>     at
>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
>>>> occurred while executing runtime program
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>> Program(
>>>> ScriptExecutor.java:390)
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>> execute(ScriptExecutor.java:298)
>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>> java:303)
>>>>     ... 14 more
>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>>> program block generated from statement block between lines 1 and 2 --
>>>> Error
>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>> execute(Program.java:130)
>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>> Program(
>>>> ScriptExecutor.java:388)
>>>>     ... 16 more
>>>> ...
>>>>
>>>>
>>>>
>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>>>> wrote:
>>>>
>>>> well, let's do the following to figure this out:
>>>>>
>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>> please change the third line to val y = input_data.select("label").
>>>>>
>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>>> problem.
>>>>>
>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>>
>>>>> 4) As a workaround you might also call our internal converter directly
>>>>> via:
>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>> isVector),
>>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>> with the
>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>> vectors
>>>>> or basic types such as double.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Matthias
>>>>>
>>>>>
>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>
>>>>> Hi SystemML folks,
>>>>>>
>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>> MLContext
>>>>>> API. The data is derived from a parquet file containing a
>>>>>> dataframe with
>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>>>> following:
>>>>>>
>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>         val x = input_data.select("features")
>>>>>>         val y = input_data.select("y")
>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>                 in("X", x, x_meta).
>>>>>>                 in("Y", y, y_meta)
>>>>>>         ...
>>>>>>
>>>>>> However, this results in an error from SystemML:
>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>>> and
>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>> differently
>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>> CoordinateMatrix
>>>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>> simple
>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>> what.
>>>>>> Please let me know if you need more information (I can send along the
>>>>>> full
>>>>>> error stacktrace if that would be helpful)!
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Anthony
>>>>>>
>>>>>>
>>>>>>
>>>>
>>

Re: Passing a CoordinateMatrix to SystemML

Posted by Matthias Boehm <mb...@gmail.com>.
Hi Anthony,

thanks for helping to debug this issue. There are no limits other than 
the dimensions and number of non-zeros being of type long. It sounds 
more like an issues of converting special cases of ultra-sparse 
matrices. I'll try to reproduce this issue and give an update as soon as 
I know more. In the meantime, could you please (a) also provide the 
stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b) 
try calling your IJV conversion script via spark submit to exclude that 
this issue is API-related? Thanks.

Regards,
Matthias

On 12/24/2017 1:40 AM, Anthony Thomas wrote:
> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
> dimensions and blocksizes to dataFrameToBinaryBlock both without success. I
> additionally wrote out the matrix to hdfs in IJV format and am still
> getting the same error when calling "read()" directly in the DML. However,
> I created a 1% sample of the original data in IJV format and SystemML was
> able to read the smaller file without any issue. This would seem to suggest
> that either there is some corruption in the full file or I'm running into
> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
> 2.4e9 nonzero values, but this seems like it should be well within the
> limits of what SystemML/Spark can handle. I also checked for obvious data
> errors (file is not 1 indexed or contains blank lines). In case it's
> helpful, the stacktrace from reading the data from hdfs in IJV format is
> attached. Thanks again for your help - I really appreciate it.
>
>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>         at java.lang.System.arraycopy(Native Method)
>         at
> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRightByN(SparseBlockCOO.java:594)
>         at
> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(SparseBlockCOO.java:323)
>         at
> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1790)
>         at
> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1736)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>         at
> org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>         at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
>         at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
>         at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
>         at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
>         at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>         at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
>         at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>
> Anthony
>
>
> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com> wrote:
>
>> Given the line numbers from the stacktrace, it seems that you use a rather
>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>> 1.0 or at least 0.15 first.
>>
>> If the error persists or you're not able to upgrade, please try to call
>> dataFrameToBinaryBlock with provided matrix characteristics of dimensions
>> and blocksizes. The issue you've shown usually originates from incorrect
>> meta data (e.g., negative number of columns or block sizes), which prevents
>> the sparse rows from growing to the necessary sizes.
>>
>> Regards,
>> Matthias
>>
>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>
>>> Hi Matthias,
>>>
>>> Thanks for the help! In response to your questions:
>>>
>>>    1. Sorry - this was a typo: the correct schema is: [y: int, features:
>>>    vector] - the column "features" was created using Spark's
>>> VectorAssembler
>>>    and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>>>    StructType(StructField(features,org.apache.spark.ml.
>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>    2. "y" converts fine - it appears the only issue is with X. The script
>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>    attached at the end of the message.
>>>    3. Unfortunately, the error persists when calling
>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>    4. Also just in case this matters: I'm packaging the script into a jar
>>>
>>>    using SBT assembly and submitting via spark-submit.
>>>
>>> Here's an updated script:
>>>
>>>         val input_df = spark.read.parquet(inputPath)
>>>         val x = input_df.select(featureNames)
>>>         val y = input_df.select("y")
>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>
>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>         println("Reading X")
>>>         val res_x = ml.execute(script_x)
>>>
>>> Here is the output of the runtime plan generated by SystemML:
>>>
>>> # EXPLAIN (RUNTIME):
>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>> PROGRAM ( size CP/SP = 3/0 )
>>> --MAIN PROGRAM
>>> ----GENERIC (lines 1-2) [recompile=false]
>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>> ------CP rmvar _Var0 _Var1
>>>
>>> And the resulting stack trace:
>>>
>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>> SparseRow.java:215)
>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>> append(SparseBlockMCSR.java:253)
>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>> appendValue(MatrixBlock.java:663)
>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>> erUtils$
>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>> erUtils$
>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>> apply(JavaRDDLike.scala:186)
>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>> apply(JavaRDDLike.scala:186)
>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>> anonfun$apply$23.apply(RDD.scala:797)
>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>> anonfun$apply$23.apply(RDD.scala:797)
>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:96)
>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:53)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>> scala:335)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>> times; aborting job
>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>> .MLContextException:
>>> Exception when executing script
>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>> java:311)
>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>> java:280)
>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>> ml_algorithms.scala:63)
>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>> NativeMethodAccessorImpl.java:62)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>> SparkSubmit.scala:180)
>>>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
>>> occurred while executing runtime program
>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>> Program(
>>> ScriptExecutor.java:390)
>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>> execute(ScriptExecutor.java:298)
>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>> java:303)
>>>     ... 14 more
>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>> program block generated from statement block between lines 1 and 2 --
>>> Error
>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>> execute(Program.java:130)
>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>> Program(
>>> ScriptExecutor.java:388)
>>>     ... 16 more
>>> ...
>>>
>>>
>>>
>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>>> wrote:
>>>
>>> well, let's do the following to figure this out:
>>>>
>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>> please change the third line to val y = input_data.select("label").
>>>>
>>>> 2) For debugging, I would recommend to use a simple script like
>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>> problem.
>>>>
>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>
>>>> 4) As a workaround you might also call our internal converter directly
>>>> via:
>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>> isVector),
>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>> containsID indicates if the dataset contains a column "__INDEX" with the
>>>> row indexes, and isVector indicates if the passed datasets contains
>>>> vectors
>>>> or basic types such as double.
>>>>
>>>>
>>>> Regards,
>>>> Matthias
>>>>
>>>>
>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>
>>>> Hi SystemML folks,
>>>>>
>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>> MLContext
>>>>> API. The data is derived from a parquet file containing a dataframe with
>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>>> following:
>>>>>
>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>         val x = input_data.select("features")
>>>>>         val y = input_data.select("y")
>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>                 in("X", x, x_meta).
>>>>>                 in("Y", y, y_meta)
>>>>>         ...
>>>>>
>>>>> However, this results in an error from SystemML:
>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>> and
>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>> differently
>>>>> here? Note that I also tried converting the dataframe to a
>>>>> CoordinateMatrix
>>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>> simple
>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>> what.
>>>>> Please let me know if you need more information (I can send along the
>>>>> full
>>>>> error stacktrace if that would be helpful)!
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Anthony
>>>>>
>>>>>
>>>>>
>>>
>

Re: Passing a CoordinateMatrix to SystemML

Posted by Anthony Thomas <ah...@eng.ucsd.edu>.
Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
dimensions and blocksizes to dataFrameToBinaryBlock both without success. I
additionally wrote out the matrix to hdfs in IJV format and am still
getting the same error when calling "read()" directly in the DML. However,
I created a 1% sample of the original data in IJV format and SystemML was
able to read the smaller file without any issue. This would seem to suggest
that either there is some corruption in the full file or I'm running into
some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
2.4e9 nonzero values, but this seems like it should be well within the
limits of what SystemML/Spark can handle. I also checked for obvious data
errors (file is not 1 indexed or contains blank lines). In case it's
helpful, the stacktrace from reading the data from hdfs in IJV format is
attached. Thanks again for your help - I really appreciate it.

 00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
        at java.lang.System.arraycopy(Native Method)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRightByN(SparseBlockCOO.java:594)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(SparseBlockCOO.java:323)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1790)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1736)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
        at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
        at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Anthony


On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mb...@gmail.com> wrote:

> Given the line numbers from the stacktrace, it seems that you use a rather
> old version of SystemML. Hence, I would recommend to upgrade to SystemML
> 1.0 or at least 0.15 first.
>
> If the error persists or you're not able to upgrade, please try to call
> dataFrameToBinaryBlock with provided matrix characteristics of dimensions
> and blocksizes. The issue you've shown usually originates from incorrect
> meta data (e.g., negative number of columns or block sizes), which prevents
> the sparse rows from growing to the necessary sizes.
>
> Regards,
> Matthias
>
> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>
>> Hi Matthias,
>>
>> Thanks for the help! In response to your questions:
>>
>>    1. Sorry - this was a typo: the correct schema is: [y: int, features:
>>    vector] - the column "features" was created using Spark's
>> VectorAssembler
>>    and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
>>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>>    StructType(StructField(features,org.apache.spark.ml.
>>    linalg.VectorUDT@3bfc3ba7,true)
>>    2. "y" converts fine - it appears the only issue is with X. The script
>>    still crashes when running "print(sum(X))". The full stack trace is
>>    attached at the end of the message.
>>    3. Unfortunately, the error persists when calling
>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>    4. Also just in case this matters: I'm packaging the script into a jar
>>
>>    using SBT assembly and submitting via spark-submit.
>>
>> Here's an updated script:
>>
>>         val input_df = spark.read.parquet(inputPath)
>>         val x = input_df.select(featureNames)
>>         val y = input_df.select("y")
>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>
>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>         println("Reading X")
>>         val res_x = ml.execute(script_x)
>>
>> Here is the output of the runtime plan generated by SystemML:
>>
>> # EXPLAIN (RUNTIME):
>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>> # Degree of Parallelism (vcores) local/remote = 24/?
>> PROGRAM ( size CP/SP = 3/0 )
>> --MAIN PROGRAM
>> ----GENERIC (lines 1-2) [recompile=false]
>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>> ------CP rmvar _Var0 _Var1
>>
>> And the resulting stack trace:
>>
>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>> SparseRow.java:215)
>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>> append(SparseBlockMCSR.java:253)
>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>> appendValue(MatrixBlock.java:663)
>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$
>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$
>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>> apply(JavaRDDLike.scala:186)
>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>> apply(JavaRDDLike.scala:186)
>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>> anonfun$apply$23.apply(RDD.scala:797)
>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>> anonfun$apply$23.apply(RDD.scala:797)
>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:96)
>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:53)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>> scala:335)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>> times; aborting job
>> Exception in thread "main" org.apache.sysml.api.mlcontext
>> .MLContextException:
>> Exception when executing script
>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>> java:311)
>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>> java:280)
>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>> ml_algorithms.scala:63)
>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>> SparkSubmit.scala:180)
>>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
>> occurred while executing runtime program
>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>> Program(
>> ScriptExecutor.java:390)
>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>> execute(ScriptExecutor.java:298)
>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>> java:303)
>>     ... 14 more
>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>> program block generated from statement block between lines 1 and 2 --
>> Error
>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>     at org.apache.sysml.runtime.controlprogram.Program.
>> execute(Program.java:130)
>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>> Program(
>> ScriptExecutor.java:388)
>>     ... 16 more
>> ...
>>
>>
>>
>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com>
>> wrote:
>>
>> well, let's do the following to figure this out:
>>>
>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>> please change the third line to val y = input_data.select("label").
>>>
>>> 2) For debugging, I would recommend to use a simple script like
>>> "print(sum(X));" and try converting X and y separately to isolate the
>>> problem.
>>>
>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>
>>> 4) As a workaround you might also call our internal converter directly
>>> via:
>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>> isVector),
>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>> containsID indicates if the dataset contains a column "__INDEX" with the
>>> row indexes, and isVector indicates if the passed datasets contains
>>> vectors
>>> or basic types such as double.
>>>
>>>
>>> Regards,
>>> Matthias
>>>
>>>
>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>
>>> Hi SystemML folks,
>>>>
>>>> I'm trying to pass some data from Spark to a DML script via the
>>>> MLContext
>>>> API. The data is derived from a parquet file containing a dataframe with
>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>> following:
>>>>
>>>>         val input_data = spark.read.parquet(inputPath)
>>>>         val x = input_data.select("features")
>>>>         val y = input_data.select("y")
>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>                 in("X", x, x_meta).
>>>>                 in("Y", y, y_meta)
>>>>         ...
>>>>
>>>> However, this results in an error from SystemML:
>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>> and
>>>> SystemML being 1 indexed. Is there something I should be doing
>>>> differently
>>>> here? Note that I also tried converting the dataframe to a
>>>> CoordinateMatrix
>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>> simple
>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>> what.
>>>> Please let me know if you need more information (I can send along the
>>>> full
>>>> error stacktrace if that would be helpful)!
>>>>
>>>> Thanks,
>>>>
>>>> Anthony
>>>>
>>>>
>>>>
>>

Re: Passing a CoordinateMatrix to SystemML

Posted by Matthias Boehm <mb...@gmail.com>.
Given the line numbers from the stacktrace, it seems that you use a 
rather old version of SystemML. Hence, I would recommend to upgrade to 
SystemML 1.0 or at least 0.15 first.

If the error persists or you're not able to upgrade, please try to call 
dataFrameToBinaryBlock with provided matrix characteristics of 
dimensions and blocksizes. The issue you've shown usually originates 
from incorrect meta data (e.g., negative number of columns or block 
sizes), which prevents the sparse rows from growing to the necessary sizes.

Regards,
Matthias

On 12/22/2017 10:42 PM, Anthony Thomas wrote:
> Hi Matthias,
>
> Thanks for the help! In response to your questions:
>
>    1. Sorry - this was a typo: the correct schema is: [y: int, features:
>    vector] - the column "features" was created using Spark's VectorAssembler
>    and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
>    Calling x.schema results in: org.apache.spark.sql.types.StructType =
>    StructType(StructField(features,org.apache.spark.ml.
>    linalg.VectorUDT@3bfc3ba7,true)
>    2. "y" converts fine - it appears the only issue is with X. The script
>    still crashes when running "print(sum(X))". The full stack trace is
>    attached at the end of the message.
>    3. Unfortunately, the error persists when calling
>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>    4. Also just in case this matters: I'm packaging the script into a jar
>    using SBT assembly and submitting via spark-submit.
>
> Here's an updated script:
>
>         val input_df = spark.read.parquet(inputPath)
>         val x = input_df.select(featureNames)
>         val y = input_df.select("y")
>         val meta_x = new MatrixMetadata(DF_VECTOR)
>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>
>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>         println("Reading X")
>         val res_x = ml.execute(script_x)
>
> Here is the output of the runtime plan generated by SystemML:
>
> # EXPLAIN (RUNTIME):
> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
> # Degree of Parallelism (vcores) local/remote = 24/?
> PROGRAM ( size CP/SP = 3/0 )
> --MAIN PROGRAM
> ----GENERIC (lines 1-2) [recompile=false]
> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
> ------CP rmvar _Var0 _Var1
>
> And the resulting stack trace:
>
> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
> SparseRow.java:215)
>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
> append(SparseBlockMCSR.java:253)
>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
> appendValue(MatrixBlock.java:663)
>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
> apply(JavaRDDLike.scala:186)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:797)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
> anonfun$apply$23.apply(RDD.scala:797)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>
> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
> times; aborting job
> Exception in thread "main" org.apache.sysml.api.mlcontext.MLContextException:
> Exception when executing script
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:311)
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:280)
>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
> ml_algorithms.scala:63)
>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:180)
>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
> occurred while executing runtime program
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
> ScriptExecutor.java:390)
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
> execute(ScriptExecutor.java:298)
>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:303)
>     ... 14 more
> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
> program block generated from statement block between lines 1 and 2 -- Error
> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>     at org.apache.sysml.runtime.controlprogram.Program.
> execute(Program.java:130)
>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
> ScriptExecutor.java:388)
>     ... 16 more
> ...
>
>
>
> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com> wrote:
>
>> well, let's do the following to figure this out:
>>
>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>> please change the third line to val y = input_data.select("label").
>>
>> 2) For debugging, I would recommend to use a simple script like
>> "print(sum(X));" and try converting X and y separately to isolate the
>> problem.
>>
>> 3) If it's still failing, it would be helpful to known (a) if it's an
>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>
>> 4) As a workaround you might also call our internal converter directly via:
>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>> isVector),
>> where jsc is the java spark context, df is the dataset, mc are matrix
>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>> containsID indicates if the dataset contains a column "__INDEX" with the
>> row indexes, and isVector indicates if the passed datasets contains vectors
>> or basic types such as double.
>>
>>
>> Regards,
>> Matthias
>>
>>
>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>
>>> Hi SystemML folks,
>>>
>>> I'm trying to pass some data from Spark to a DML script via the MLContext
>>> API. The data is derived from a parquet file containing a dataframe with
>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>> following:
>>>
>>>         val input_data = spark.read.parquet(inputPath)
>>>         val x = input_data.select("features")
>>>         val y = input_data.select("y")
>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>                 in("X", x, x_meta).
>>>                 in("Y", y, y_meta)
>>>         ...
>>>
>>> However, this results in an error from SystemML:
>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>> I'm guessing this has something to do with SparkML being zero indexed and
>>> SystemML being 1 indexed. Is there something I should be doing differently
>>> here? Note that I also tried converting the dataframe to a
>>> CoordinateMatrix
>>> and then creating an RDD[String] in IJV format. That too resulted in
>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something simple
>>> I'm doing wrong here, but I haven't been able to figure out exactly what.
>>> Please let me know if you need more information (I can send along the full
>>> error stacktrace if that would be helpful)!
>>>
>>> Thanks,
>>>
>>> Anthony
>>>
>>>
>

Re: Passing a CoordinateMatrix to SystemML

Posted by Anthony Thomas <ah...@eng.ucsd.edu>.
Hi Matthias,

Thanks for the help! In response to your questions:

   1. Sorry - this was a typo: the correct schema is: [y: int, features:
   vector] - the column "features" was created using Spark's VectorAssembler
   and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
   Calling x.schema results in: org.apache.spark.sql.types.StructType =
   StructType(StructField(features,org.apache.spark.ml.
   linalg.VectorUDT@3bfc3ba7,true)
   2. "y" converts fine - it appears the only issue is with X. The script
   still crashes when running "print(sum(X))". The full stack trace is
   attached at the end of the message.
   3. Unfortunately, the error persists when calling
   RDDConverterUtils.dataFrameToBinaryBlock directly.
   4. Also just in case this matters: I'm packaging the script into a jar
   using SBT assembly and submitting via spark-submit.

Here's an updated script:

        val input_df = spark.read.parquet(inputPath)
        val x = input_df.select(featureNames)
        val y = input_df.select("y")
        val meta_x = new MatrixMetadata(DF_VECTOR)
        val meta_y = new MatrixMetadata(DF_DOUBLES)

        val script_x = dml("print(sum(X))").in("X", x, meta_x)
        println("Reading X")
        val res_x = ml.execute(script_x)

Here is the output of the runtime plan generated by SystemML:

# EXPLAIN (RUNTIME):
# Memory Budget local/remote = 76459MB/?MB/?MB/?MB
# Degree of Parallelism (vcores) local/remote = 24/?
PROGRAM ( size CP/SP = 3/0 )
--MAIN PROGRAM
----GENERIC (lines 1-2) [recompile=false]
------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
------CP rmvar _Var0 _Var1

And the resulting stack trace:

7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.sysml.runtime.matrix.data.SparseRow.append(
SparseRow.java:215)
    at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
append(SparseBlockMCSR.java:253)
    at org.apache.sysml.runtime.matrix.data.MatrixBlock.
appendValue(MatrixBlock.java:663)
    at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
    at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$
DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
apply(JavaRDDLike.scala:186)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
apply(JavaRDDLike.scala:186)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
times; aborting job
Exception in thread "main" org.apache.sysml.api.mlcontext.MLContextException:
Exception when executing script
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:311)
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:280)
    at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
ml_algorithms.scala:63)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
    at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
occurred while executing runtime program
    at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
ScriptExecutor.java:390)
    at org.apache.sysml.api.mlcontext.ScriptExecutor.
execute(ScriptExecutor.java:298)
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:303)
    ... 14 more
Caused by: org.apache.sysml.runtime.DMLRuntimeException:
org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
program block generated from statement block between lines 1 and 2 -- Error
evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
    at org.apache.sysml.runtime.controlprogram.Program.
execute(Program.java:130)
    at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(
ScriptExecutor.java:388)
    ... 16 more
...



On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mb...@gmail.com> wrote:

> well, let's do the following to figure this out:
>
> 1) If the schema is indeed [label: Integer, features: SparseVector],
> please change the third line to val y = input_data.select("label").
>
> 2) For debugging, I would recommend to use a simple script like
> "print(sum(X));" and try converting X and y separately to isolate the
> problem.
>
> 3) If it's still failing, it would be helpful to known (a) if it's an
> issue of converting X, y, or both, as well as (b) the full stacktrace.
>
> 4) As a workaround you might also call our internal converter directly via:
> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
> isVector),
> where jsc is the java spark context, df is the dataset, mc are matrix
> characteristics (if unknown, simply use new MatrixCharacteristics()),
> containsID indicates if the dataset contains a column "__INDEX" with the
> row indexes, and isVector indicates if the passed datasets contains vectors
> or basic types such as double.
>
>
> Regards,
> Matthias
>
>
> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>
>> Hi SystemML folks,
>>
>> I'm trying to pass some data from Spark to a DML script via the MLContext
>> API. The data is derived from a parquet file containing a dataframe with
>> the schema: [label: Integer, features: SparseVector]. I am doing the
>> following:
>>
>>         val input_data = spark.read.parquet(inputPath)
>>         val x = input_data.select("features")
>>         val y = input_data.select("y")
>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>                 in("X", x, x_meta).
>>                 in("Y", y, y_meta)
>>         ...
>>
>> However, this results in an error from SystemML:
>> java.lang.ArrayIndexOutOfBoundsException: 0
>> I'm guessing this has something to do with SparkML being zero indexed and
>> SystemML being 1 indexed. Is there something I should be doing differently
>> here? Note that I also tried converting the dataframe to a
>> CoordinateMatrix
>> and then creating an RDD[String] in IJV format. That too resulted in
>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something simple
>> I'm doing wrong here, but I haven't been able to figure out exactly what.
>> Please let me know if you need more information (I can send along the full
>> error stacktrace if that would be helpful)!
>>
>> Thanks,
>>
>> Anthony
>>
>>

Re: Passing a CoordinateMatrix to SystemML

Posted by Matthias Boehm <mb...@gmail.com>.
well, let's do the following to figure this out:

1) If the schema is indeed [label: Integer, features: SparseVector], 
please change the third line to val y = input_data.select("label").

2) For debugging, I would recommend to use a simple script like 
"print(sum(X));" and try converting X and y separately to isolate the 
problem.

3) If it's still failing, it would be helpful to known (a) if it's an 
issue of converting X, y, or both, as well as (b) the full stacktrace.

4) As a workaround you might also call our internal converter directly via:
RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID, 
isVector),
where jsc is the java spark context, df is the dataset, mc are matrix 
characteristics (if unknown, simply use new MatrixCharacteristics()), 
containsID indicates if the dataset contains a column "__INDEX" with the 
row indexes, and isVector indicates if the passed datasets contains 
vectors or basic types such as double.


Regards,
Matthias

On 12/22/2017 12:00 AM, Anthony Thomas wrote:
> Hi SystemML folks,
>
> I'm trying to pass some data from Spark to a DML script via the MLContext
> API. The data is derived from a parquet file containing a dataframe with
> the schema: [label: Integer, features: SparseVector]. I am doing the
> following:
>
>         val input_data = spark.read.parquet(inputPath)
>         val x = input_data.select("features")
>         val y = input_data.select("y")
>         val x_meta = new MatrixMetadata(DF_VECTOR)
>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>         val script = dmlFromFile(s"${script_path}/script.dml").
>                 in("X", x, x_meta).
>                 in("Y", y, y_meta)
>         ...
>
> However, this results in an error from SystemML:
> java.lang.ArrayIndexOutOfBoundsException: 0
> I'm guessing this has something to do with SparkML being zero indexed and
> SystemML being 1 indexed. Is there something I should be doing differently
> here? Note that I also tried converting the dataframe to a CoordinateMatrix
> and then creating an RDD[String] in IJV format. That too resulted in
> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something simple
> I'm doing wrong here, but I haven't been able to figure out exactly what.
> Please let me know if you need more information (I can send along the full
> error stacktrace if that would be helpful)!
>
> Thanks,
>
> Anthony
>