You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rawkintrevo <gi...@git.apache.org> on 2016/04/16 00:49:01 UTC

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

GitHub user rawkintrevo opened a pull request:

    https://github.com/apache/flink/pull/1898

    [FLINK-2259][ml] Add Train-Testing Splitters

    This PR adds an object in ml/pipeline called splitter with the following methods:
    
    randomSplit: Splits a DataSet into two data sets using DataSet.sample
    multiRandomSplit: Splits a DataSet into multiple datasets according to an array of proportions
    kFoldSplit: Splits DataSet into k TrainTest objects which have a testing data set of size 1/k of the original dataset and the remainder of the dataset in the training
    trainTestSplit: A wrapper for randomSplit that return a TrainTest object
    trainTestHoldoutSplit: A wrapper for multiRandomSplit that returns a TrainTestHoldout object
    
    the TrainTest and TrainTestHoldout objects are case classes.  randomSplit and multiRandomSplit return arrays of DataSets.
    
    - [x] General
      
    - [ ] Documentation
      - Documentation is in code, will write markdown after review/feedback/finalization
    
    - [x] Tests & Build


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rawkintrevo/flink train-test-split

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1898.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1898
    
----
commit ec1e65a31d80b33589b73619f2a5dd0a8e09c568
Author: Trevor Grant <tr...@gmail.com>
Date:   2016-04-15T22:37:51Z

    Add Splitter Pre-processing

commit 3ecdc3818dd11a847136510dabe96f444924d319
Author: Trevor Grant <tr...@gmail.com>
Date:   2016-04-15T22:40:38Z

    Add Splitter Pre-processing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739128
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    --- End diff --
    
    Maybe we could `union` `splitDataSets(0)` and `splitDataSet(1)` and then join them with `dataSet` and then count the whole thing. Then we could avoid two executions because every `count` will trigger the execution of the whole pipeline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63498199
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    +
    +The method returns a new `TrainTestDataSet` object which has a `.training` attribute containing the training DataSet and a `.testing` attribute containing the testing DataSet.
    +
    +
    +### Train-Test-Holdout Splits
    +
    +In some cases, algorithms have been known to 'learn' the testing set.  To combat this issue, a train-test-hold out strategy introduces a secondary holdout set, aptly called the *holdout* set.
    +
    +Traditionally, training and testing would be done to train an algorithms as normal and then a final test of the algorithm on the holdout set would be done.  Ideally, prediction errors/model scores in the holdout set would not be significantly different than those observed in the testing set.
    +
    +In a train-test-holdout strategy we sacrifice the sample size of the initial fitting algorithm for increased confidence that our model is not over-fit.
    +
    +When using `trainTestHoldout` splitter, the *fraction* `Double` is replaced by a *fraction* array of length three. The first element coresponds to the portion to be used for training, second for testing, and third for holdout.  The weights of this array are *relative*, e.g. an array `Array(3.0, 2.0, 1.0)` would results in approximately 50% of the observations being in the training set, 33% of the observations in the testing set, and 17% of the observations in holdout set.
    +
    +### K-Fold Splits
    +
    +In a *k-fold* strategy, the DataSet is split into *k* equal subsets. Then for each of the *k* subsets, a `TrainTestDataSet` is created where the subset is the `.training` DataSet, and the remaining subsets are the `.testing` set.
    +
    +For each training set, an algorithm is trained and then is evaluated based on the predictions based on the assosciated testing set. When an algorithm that has consistent grades (e.g. prediction errors) across held out datasets we can have some confidence that our approach (e.g. choice of algorithm / algorithm parameters / number of iterations) is robust against overfitting.
    +
    +<a href="https://en.wikipedia.org/wiki/Cross-validation_(statistics)#k-fold_cross-validation">K-Fold Cross Validatation</a>
    +
    +### Multi-Random Splits
    +
    +The *multi-random* strategy can be thought of as a more general form of the *train-test-holdout* strategy. In fact, `.trainTestHoldoutSplit` is simple a wrapper for `multiRandomSplit` which also packages the datasets into a `trainTestHoldoutDataSet` object.
    --- End diff --
    
    "is a simple wrapper"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r61582494
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    --- End diff --
    
    I think boostrapping would be a cool feature- but would require a different approach than the joins on the leftSplit/rightSplit. 
    
    If you over sample the leftSplit, there's not going to be anything left to put in the right split (the whole points was to keep the training and testing cases seperate).
    
    I'm going to to add a boostrap method that will allow for oversampling in the testing and training cases.  Re: the next comment, I will test is separately.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1898: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1898
  
    Great work @rawkintrevo and thank you very much for your contribution :-) I'm going to merge your PR now. Sorry for the long waiting time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-219120765
  
    bump?  failing on flaky test, can someone restart/verify/etc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63507717
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import org.apache.flink.util.Collector
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] This fraction
    +   *                        refers to the first element in the resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithUniqueId
    +
    +    if ((fraction >= 1) || (fraction <= 0)) {
    +      throw new IllegalArgumentException("sampling fraction outside of (0,1) bounds is nonsensical")
    +    }
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    --- End diff --
    
    before doing the `leftOuterJoin` we could strip the `leftSplit` from it's payload and simply joining with the ids. That would reduce the network I/O.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63605367
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    +
    +The method returns a new `TrainTestDataSet` object which has a `.training` attribute containing the training DataSet and a `.testing` attribute containing the testing DataSet.
    +
    +
    +### Train-Test-Holdout Splits
    +
    +In some cases, algorithms have been known to 'learn' the testing set.  To combat this issue, a train-test-hold out strategy introduces a secondary holdout set, aptly called the *holdout* set.
    +
    +Traditionally, training and testing would be done to train an algorithms as normal and then a final test of the algorithm on the holdout set would be done.  Ideally, prediction errors/model scores in the holdout set would not be significantly different than those observed in the testing set.
    +
    +In a train-test-holdout strategy we sacrifice the sample size of the initial fitting algorithm for increased confidence that our model is not over-fit.
    +
    +When using `trainTestHoldout` splitter, the *fraction* `Double` is replaced by a *fraction* array of length three. The first element coresponds to the portion to be used for training, second for testing, and third for holdout.  The weights of this array are *relative*, e.g. an array `Array(3.0, 2.0, 1.0)` would results in approximately 50% of the observations being in the training set, 33% of the observations in the testing set, and 17% of the observations in holdout set.
    +
    +### K-Fold Splits
    +
    +In a *k-fold* strategy, the DataSet is split into *k* equal subsets. Then for each of the *k* subsets, a `TrainTestDataSet` is created where the subset is the `.training` DataSet, and the remaining subsets are the `.testing` set.
    +
    +For each training set, an algorithm is trained and then is evaluated based on the predictions based on the assosciated testing set. When an algorithm that has consistent grades (e.g. prediction errors) across held out datasets we can have some confidence that our approach (e.g. choice of algorithm / algorithm parameters / number of iterations) is robust against overfitting.
    +
    +<a href="https://en.wikipedia.org/wiki/Cross-validation_(statistics)#k-fold_cross-validation">K-Fold Cross Validatation</a>
    --- End diff --
    
    It's been a while since I wrote this, but I vaguely remember having some sort of issue either with the build (or just my mark down interpretter) specifically on this link, I think it had to do with the parentheses in the link? That's why I fell back to html.  Also, I never can get markdown links to come out correctly in tables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-210728264
  
    One build failed on error: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found.
    
    Another on some weird YARN stuff, which I didn't touch. Flakey builds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60736953
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    --- End diff --
    
    `println` should be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60735451
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    --- End diff --
    
    We could write this a bit more efficiently:
    
    ```
    val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
          .where(0)
          .equalTo(0).apply {
            (full: (Long,T) , left: (Long, T), collector: Collector[T]) =>
              if (left == null) {
                collector.collect(full._2)
              }
          }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-216241912
  
    This should be gtg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-211471397
  
    Yes the failing tests are most likely caused by the build infrastructure. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60732832
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    --- End diff --
    
    whitespace between `scala.` and `DataSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60738627
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    +    output(splits-1) = tempDS
    +    output
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                kFolds: Int,
    +                                                precise: Boolean = false,
    +                                                seed: Long = Utils.RNG.nextLong())
    +  : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, precise, seed)
    +
    +    dataSetArray.zipWithIndex.map( ds => TrainTestDataSet(ds._1,
    +                                          unionDataSetArray(dataSetArray.filter(_ != ds._1))) )
    +
    +  }
    +
    +  def unionDataSetArray[T: TypeInformation : ClassTag](dsa : Array[DataSet[T]]): DataSet[T] = {
    +    var dsu = dsa(0)
    +    for (k <- 1 to dsa.length-1) {
    +      dsu = dsu.union(dsa(k))
    +    }
    +    dsu
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for randomSplit that yields a TrainTestDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the training element in
    +   *                        TrainTestSplit
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                     fraction: Double = 0.6,
    +                                                     precise: Boolean = false,
    +                                                     seed: Long = Utils.RNG.nextLong())
    +  : TrainTestDataSet[T] = {
    +    val dataSetArray = randomSplit(input, fraction, precise, seed)
    +    TrainTestDataSet(dataSetArray(0), dataSetArray(1))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestHoldoutSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for multiRandomSplit that yields a TrainTestHoldoutDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of length 3, where the first element specifies the size of the
    +   *                        training set, the second element the testing set, and the third
    +   *                        element is the holdout set. These are proportional and will be
    +   *                        normalized internally.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestHoldoutSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                    fracArray: Array[Double] = Array(0.6,0.3,0.1),
    --- End diff --
    
    Maybe `fracArray` could be an `(Double, Double, Double)` (tuple3) then we wouldn't have to check whether it has 3 elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739818
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    --- End diff --
    
    What happens if fraction is larger than `1` and `withReplacement` is set to `false`? Shouldn't it be set to `true` in this case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60737984
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    +    output(splits-1) = tempDS
    +    output
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                kFolds: Int,
    +                                                precise: Boolean = false,
    +                                                seed: Long = Utils.RNG.nextLong())
    +  : Array[TrainTestDataSet[T]] = {
    --- End diff --
    
    Formatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-219910631
  
    Thanks for the review @tillrohrmann 
    
    I need to find a markdown editor with spell check...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63497505
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    --- End diff --
    
    typo: weather --> whether


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60732781
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    --- End diff --
    
    In the non-official Scala style guide we would format it the following way:
    ```
    def foobar(
        a: Int,
        b: Double)
      : R = {
      code
    } 
    ```
    Parameters are indented twice and the return type once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60738393
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    +    output(splits-1) = tempDS
    +    output
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                kFolds: Int,
    +                                                precise: Boolean = false,
    +                                                seed: Long = Utils.RNG.nextLong())
    +  : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, precise, seed)
    +
    +    dataSetArray.zipWithIndex.map( ds => TrainTestDataSet(ds._1,
    +                                          unionDataSetArray(dataSetArray.filter(_ != ds._1))) )
    +
    +  }
    +
    +  def unionDataSetArray[T: TypeInformation : ClassTag](dsa : Array[DataSet[T]]): DataSet[T] = {
    +    var dsu = dsa(0)
    +    for (k <- 1 to dsa.length-1) {
    +      dsu = dsu.union(dsa(k))
    +    }
    +    dsu
    --- End diff --
    
    We could express that function as `dsa.reduce(_ union _)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60737947
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    --- End diff --
    
    I'm not so sure, whether this scales so well. The reason is that we're constructing some really long pipelines with this for loop where each iteration contains an outer join and is the input for the next iteration. I'm wondering whether we cannot simply achieve the assignment of elements to splits in one iteration. We could for example generate for each element a random number and see in which bin it falls (indicated by the normalized fraction array). Then we could apply a number of filters to the result with the assigned splits in order to obtain the different splits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r61504020
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
    +
    +    val expectedLength = dataSet.count().toDouble * 0.5
    +
    +    splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
    --- End diff --
    
    can and statistically will. removing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-213434810
  
    Thanks for your contribution @rawkintrevo. Good work. I had some minor inline comments. I'm mainly concerned about the efficiency of `multiRandomSplit` because it can construct some really long pipelines.
    
    I think we should also add online documentation for the `Splitter`. Otherwise people will just miss it. You can take a look at `docs/libs/ml/` and create a web page for the splitter. We could then create a site with tools from where we link to the `Splitter`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63498396
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    +
    +The method returns a new `TrainTestDataSet` object which has a `.training` attribute containing the training DataSet and a `.testing` attribute containing the testing DataSet.
    +
    +
    +### Train-Test-Holdout Splits
    +
    +In some cases, algorithms have been known to 'learn' the testing set.  To combat this issue, a train-test-hold out strategy introduces a secondary holdout set, aptly called the *holdout* set.
    +
    +Traditionally, training and testing would be done to train an algorithms as normal and then a final test of the algorithm on the holdout set would be done.  Ideally, prediction errors/model scores in the holdout set would not be significantly different than those observed in the testing set.
    +
    +In a train-test-holdout strategy we sacrifice the sample size of the initial fitting algorithm for increased confidence that our model is not over-fit.
    +
    +When using `trainTestHoldout` splitter, the *fraction* `Double` is replaced by a *fraction* array of length three. The first element coresponds to the portion to be used for training, second for testing, and third for holdout.  The weights of this array are *relative*, e.g. an array `Array(3.0, 2.0, 1.0)` would results in approximately 50% of the observations being in the training set, 33% of the observations in the testing set, and 17% of the observations in holdout set.
    +
    +### K-Fold Splits
    +
    +In a *k-fold* strategy, the DataSet is split into *k* equal subsets. Then for each of the *k* subsets, a `TrainTestDataSet` is created where the subset is the `.training` DataSet, and the remaining subsets are the `.testing` set.
    +
    +For each training set, an algorithm is trained and then is evaluated based on the predictions based on the assosciated testing set. When an algorithm that has consistent grades (e.g. prediction errors) across held out datasets we can have some confidence that our approach (e.g. choice of algorithm / algorithm parameters / number of iterations) is robust against overfitting.
    +
    +<a href="https://en.wikipedia.org/wiki/Cross-validation_(statistics)#k-fold_cross-validation">K-Fold Cross Validatation</a>
    +
    +### Multi-Random Splits
    +
    +The *multi-random* strategy can be thought of as a more general form of the *train-test-holdout* strategy. In fact, `.trainTestHoldoutSplit` is simple a wrapper for `multiRandomSplit` which also packages the datasets into a `trainTestHoldoutDataSet` object.
    +
    +The first major difference, is that `multiRandomSplit` takes an array of fractions of any length. E.g. one can create multiple holdout sets.  Alternatively, one could think of `kFoldSplit` as a wrapper for `multiRandomSplit` (which it is), the difference being `kFoldSplit` creates subsets of approximately equal size, where `multiRandomSplit` will create subsets of any size.
    +
    +The second major difference is that `multiRandomSplit` returns an array of DataSets, equal in size and proportion to the *fraction array* that it was passed as an argument.
    +
    +## Parameters
    +
    +The various `Splitter` methods share many parameters.
    +
    + <table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Parameter</th>
    +      <th class="text-center">Type</th>
    +      <th class="text-center">Description</th>
    +      <th class="text-right">Used by Method</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><code>input</code></td>
    +      <td><code>DataSet[Any]</code></td>
    +      <td>DataSet to be split.</td>
    +      <td>
    +      <code>randomSplit</code><br>
    +      <code>multiRandomSplit</code><br>
    +      <code>kFoldSplit</code><br>
    +      <code>trainTestSplit</code><br>
    +      <code>trainTestHoldoutSplit</code>
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><code>seed</code></td>
    +      <td><code>Long</code></td>
    +      <td>
    +        <p>
    +          Used for seeding the random number generator which sorts DataSets into other DataSets.
    +        </p>
    +      </td>
    +      <td>
    +      <code>randomSplit</code><br>
    +      <code>multiRandomSplit</code><br>
    +      <code>kFoldSplit</code><br>
    +      <code>trainTestSplit</code><br>
    +      <code>trainTestHoldoutSplit</code>
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><code>precise</code></td>
    +      <td><code>Boolean</code></td>
    +      <td>When true, make additional effort to make DataSets as close to the prescribed proportions as possible.</td>
    +      <td>
    +      <code>randomSplit</code><br>
    +      <code>trainTestSplit</code>
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><code>fraction</code></td>
    +      <td><code>Double</code></td>
    +      <td>The portion of the `input` to assign to the first or <code>.training</code> DataSet. Must be in the range (0,1)</td>
    +      <td><code>randomSplit</code><br>
    +        <code>trainTestSplit</code>
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><code>fracArray</code></td>
    +      <td><code>Array[Double]</code></td>
    +      <td>An array that prescribes the proportions of the output datasets (proportions need not sum to 1 or be within the range (0,1))</td>
    +      <td>
    +      <code>multiRandomSplit</code><br>
    +      <code>trainTestHoldoutSplit</code>
    +      </td>
    +    </tr>
    +    <tr>
    +      <td><code>kFolds</code></td>
    +      <td><code>Int</code></td>
    +      <td>The number of subsets to break the <code>input</code> DataSet into.</td>
    +      <td><code>kFoldSplit</code></td>
    +      </tr>
    +
    +  </tbody>
    +</table>
    +
    +## Examples
    +
    +{% highlight scala %}
    +// An input dataset- does not have to be of type LabeledVector
    +val data: DataSet[LabeledVector] = ...
    +
    +// A Simple Train-Test-Split
    +val dataTrainTest: TrainTestDataSet = Splitter.trainTestSplit(data, 0.6, true )
    --- End diff --
    
    whitespace before closing parenthesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63499292
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import org.apache.flink.util.Collector
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] This fraction
    +   *                        refers to the first element in the resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithUniqueId
    +
    +    if ((fraction >= 1) || (fraction <= 0)) {
    +      throw new IllegalArgumentException("sampling fraction outside of (0,1) bounds is nonsensical")
    +    }
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    --- End diff --
    
    The `count` operation can be quite expensive since we're triggering the execution of the flink job at this point. At the moment, the `sampleWithSize` methods requires a number of samples as the parameter. Maybe we could extend the functionality in the future so that one can also give a fraction value instead. Then the method could calculate the count without the `count` method by simply doing a reduce operation and then broadcasting the result to all sample operators.
    
    So maybe we could add a comment that this could/should be improved once we have this functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63508795
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithUniqueId, 0.5)
    +
    +   (splitDataSets(0).union(splitDataSets(1)).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5, true)
    +
    +    val expectedLength = data.size.toDouble * 0.5
    +
    +    splitDataSets(0).count().toDouble should equal(expectedLength +- 1.0)
    +  }
    +
    +  it should "result in expected number of datasets" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val fracArray = Array(0.5, 0.25, 0.25)
    +
    +    val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray)
    +
    +    splitDataSets.length should equal(fracArray.length)
    +  }
    +
    +  it should "throw an exception if trainTestHoldoutSplit fracArray length is not 3" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    intercept[IllegalArgumentException] {
    +      val splitDataSets = Splitter.trainTestHoldoutSplit(dataSet, Array(0.25, 0.25, 0.05, 0.15))
    +    }
    +
    +    intercept[IllegalArgumentException] {
    +      val splitDataSets = Splitter.trainTestHoldoutSplit(dataSet, Array(0.25, 0.25))
    +    }
    +
    +  }
    +
    +  it should "throw an exception if sample fraction ins nonsensical" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    intercept[IllegalArgumentException] {
    +      val splitDataSets = Splitter.randomSplit(dataSet, -0.2)
    +    }
    +
    +    intercept[IllegalArgumentException] {
    +      val splitDataSets = Splitter.randomSplit(dataSet, -1.2)
    +    }
    +
    +  }
    --- End diff --
    
    Maybe we could add a test for k-Fold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-219696600
  
    The PR looks really good @rawkintrevo. I had some minor comments. Once they are fixed, we should be good to merge this PR :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739883
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
    +
    +    val expectedLength = dataSet.count().toDouble * 0.5
    +
    +    splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
    +  }
    +
    +  it should "result in expected number of datasets" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val fracArray = Array(0.5, 0.25, 0.25)
    +
    +    val splitDataSets = Splitter.multiRandomSplit(dataSet, fracArray)
    +
    +    splitDataSets.length should equal(fracArray.length)
    +  }
    +
    --- End diff --
    
    Maybe we could add a test case for sampling with replacement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739425
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
    +
    +    val expectedLength = dataSet.count().toDouble * 0.5
    --- End diff --
    
    The size of `dataSet` we can calculate without executing a job. Simply `data.size`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63498117
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    +
    +The method returns a new `TrainTestDataSet` object which has a `.training` attribute containing the training DataSet and a `.testing` attribute containing the testing DataSet.
    +
    +
    +### Train-Test-Holdout Splits
    +
    +In some cases, algorithms have been known to 'learn' the testing set.  To combat this issue, a train-test-hold out strategy introduces a secondary holdout set, aptly called the *holdout* set.
    +
    +Traditionally, training and testing would be done to train an algorithms as normal and then a final test of the algorithm on the holdout set would be done.  Ideally, prediction errors/model scores in the holdout set would not be significantly different than those observed in the testing set.
    +
    +In a train-test-holdout strategy we sacrifice the sample size of the initial fitting algorithm for increased confidence that our model is not over-fit.
    +
    +When using `trainTestHoldout` splitter, the *fraction* `Double` is replaced by a *fraction* array of length three. The first element coresponds to the portion to be used for training, second for testing, and third for holdout.  The weights of this array are *relative*, e.g. an array `Array(3.0, 2.0, 1.0)` would results in approximately 50% of the observations being in the training set, 33% of the observations in the testing set, and 17% of the observations in holdout set.
    +
    +### K-Fold Splits
    +
    +In a *k-fold* strategy, the DataSet is split into *k* equal subsets. Then for each of the *k* subsets, a `TrainTestDataSet` is created where the subset is the `.training` DataSet, and the remaining subsets are the `.testing` set.
    +
    +For each training set, an algorithm is trained and then is evaluated based on the predictions based on the assosciated testing set. When an algorithm that has consistent grades (e.g. prediction errors) across held out datasets we can have some confidence that our approach (e.g. choice of algorithm / algorithm parameters / number of iterations) is robust against overfitting.
    +
    +<a href="https://en.wikipedia.org/wiki/Cross-validation_(statistics)#k-fold_cross-validation">K-Fold Cross Validatation</a>
    --- End diff --
    
    We could use markdown syntax for links. But is not really important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-211507574
  
    Any thoughts on this approach or go ahead and write docs/prepare to merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r61503436
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    +    output(splits-1) = tempDS
    +    output
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                kFolds: Int,
    +                                                precise: Boolean = false,
    +                                                seed: Long = Utils.RNG.nextLong())
    +  : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, precise, seed)
    +
    +    dataSetArray.zipWithIndex.map( ds => TrainTestDataSet(ds._1,
    +                                          unionDataSetArray(dataSetArray.filter(_ != ds._1))) )
    +
    +  }
    +
    +  def unionDataSetArray[T: TypeInformation : ClassTag](dsa : Array[DataSet[T]]): DataSet[T] = {
    +    var dsu = dsa(0)
    +    for (k <- 1 to dsa.length-1) {
    +      dsu = dsu.union(dsa(k))
    +    }
    +    dsu
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for randomSplit that yields a TrainTestDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the training element in
    +   *                        TrainTestSplit
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                     fraction: Double = 0.6,
    +                                                     precise: Boolean = false,
    +                                                     seed: Long = Utils.RNG.nextLong())
    +  : TrainTestDataSet[T] = {
    +    val dataSetArray = randomSplit(input, fraction, precise, seed)
    +    TrainTestDataSet(dataSetArray(0), dataSetArray(1))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestHoldoutSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for multiRandomSplit that yields a TrainTestHoldoutDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of length 3, where the first element specifies the size of the
    +   *                        training set, the second element the testing set, and the third
    +   *                        element is the holdout set. These are proportional and will be
    +   *                        normalized internally.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestHoldoutSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                    fracArray: Array[Double] = Array(0.6,0.3,0.1),
    --- End diff --
    
    couldn't figure a way to coerce a tuple3 into a typed array that was less invasive than simply throwing the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60733883
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    --- End diff --
    
    it should be sufficient to use `zipWithUniqueId` here. It is more efficient than `zipWithIndex`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63498454
  
    --- Diff: docs/apis/batch/libs/ml/index.md ---
    @@ -86,10 +87,18 @@ Now you can start solving your analysis task.
     The following code snippet shows how easy it is to train a multiple linear regression model.
     
     {% highlight scala %}
    +
    +
    --- End diff --
    
    Why inserting two line breaks here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60732457
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    --- End diff --
    
    nice ∞ :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60738935
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    --- End diff --
    
    `zipWithUniqueId` should be fine here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63508672
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import org.apache.flink.util.Collector
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] This fraction
    +   *                        refers to the first element in the resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithUniqueId
    +
    +    if ((fraction >= 1) || (fraction <= 0)) {
    +      throw new IllegalArgumentException("sampling fraction outside of (0,1) bounds is nonsensical")
    +    }
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0).apply {
    +        (full: (Long,T) , left: (Long, T), collector: Collector[T]) =>
    +        if (left == null) {
    +          collector.collect(full._2)
    +        }
    +    }
    +
    +    Array(leftSplit.map(o => o._2), rightSplit)
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fracArray: Array[Double],
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[DataSet[T]] = {
    +
    +    import org.apache.commons.math3.distribution.EnumeratedIntegerDistribution
    +
    +    val eid = new EnumeratedIntegerDistribution((0 to fracArray.length - 1).toArray, fracArray)
    +
    +    eid.reseedRandomGenerator(seed)
    +
    +    val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o))
    +
    +    val splits = fracArray.length
    +    val outputArray = new Array[DataSet[T]]( splits )
    +
    +    for (k <- 0 to splits-1){
    +      outputArray(k) = tempDS.filter(o => o._1 == k)
    +                             .map(o => o._2)
    +    }
    +
    +    outputArray
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      kFolds: Int,
    +      seed: Long = Utils.RNG.nextLong())
    +    : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, seed)
    +
    +    dataSetArray.map( ds => TrainTestDataSet(ds,
    +                                             dataSetArray.filter(_ != ds)
    +                                                         .reduce(_ union _) ))
    +
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for randomSplit that yields a TrainTestDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1].
    +   *                        This fraction refers to the training element in TrainTestSplit
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fraction: Double = 0.6,
    +      precise: Boolean = false,
    +      seed: Long = Utils.RNG.nextLong())
    +    : TrainTestDataSet[T] = {
    +    val dataSetArray = randomSplit(input, fraction, precise, seed)
    +    TrainTestDataSet(dataSetArray(0), dataSetArray(1))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  trainTestHoldoutSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * A wrapper for multiRandomSplit that yields a TrainTestHoldoutDataSet
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of three doubles, where the first element specifies the
    +   *                        size of the training set, the second element the testing set, and
    +   *                        the third element is the holdout set. These are proportional and
    +   *                        will be normalized internally.
    +   * @param seed            Random number generator seed.
    +   * @return A TrainTestDataSet
    +   */
    +  def trainTestHoldoutSplit[T: TypeInformation : ClassTag](
    +      input: DataSet[T],
    +      fracArray: Array[Double] = Array(0.6,0.3,0.1),
    --- End diff --
    
    Why not requiring a `Tuple3[Double, Double, Double]` here, then we wouldn't have to do the check that the array has the correct length == one possibility where the user can shoot himself in the foot less ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r63498002
  
    --- Diff: docs/apis/batch/libs/ml/cross_validation.md ---
    @@ -0,0 +1,175 @@
    +---
    +mathjax: include
    +title: Cross Validation
    +
    +# Sub navigation
    +sub-nav-group: batch
    +sub-nav-parent: flinkml
    +sub-nav-title: Cross Validation
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +## Description
    +
    + A prevalent problem when utilizing machine learning algorithms is *overfitting*, or when an algorithm "memorizes" the training data but does a poor job extrapolating to out of sample cases. A common method for dealing with the overfitting problem is to hold back some subset of data from the original training algorithm and then measure the fit algorithm's performance on this hold-out set. This is commonly known as *cross validation*.  A model is trained on one subset of data and then *validated* on another set of data.
    +
    +## Cross Validation Strategies
    +
    +There are several strategies for holding out data. FlinkML has convenience methods for
    +- Train-Test Splits
    +- Train-Test-Holdout Splits
    +- K-Fold Splits
    +- Multi-Random Splits
    +
    +### Train-Test Splits
    +
    +The simplest method of splitting is the `trainTestSplit`. This split takes a DataSet and a parameter *fraction*.  The *fraction* indicates the portion of the DataSet that should be allocated to the training set. This split also takes two additional optional parameters, *precise* and *seed*.  
    +
    +By default, the Split is done by randomly deciding weather or not an observation is assigned to the training DataSet with probability = *fraction*.  When *precise* is `true` however, additional steps are taken to ensure the training set is as close as possible to the length of the DataSet  $\cdot$ *fraction*.
    +
    +The method returns a new `TrainTestDataSet` object which has a `.training` attribute containing the training DataSet and a `.testing` attribute containing the testing DataSet.
    +
    +
    +### Train-Test-Holdout Splits
    +
    +In some cases, algorithms have been known to 'learn' the testing set.  To combat this issue, a train-test-hold out strategy introduces a secondary holdout set, aptly called the *holdout* set.
    +
    +Traditionally, training and testing would be done to train an algorithms as normal and then a final test of the algorithm on the holdout set would be done.  Ideally, prediction errors/model scores in the holdout set would not be significantly different than those observed in the testing set.
    +
    +In a train-test-holdout strategy we sacrifice the sample size of the initial fitting algorithm for increased confidence that our model is not over-fit.
    +
    +When using `trainTestHoldout` splitter, the *fraction* `Double` is replaced by a *fraction* array of length three. The first element coresponds to the portion to be used for training, second for testing, and third for holdout.  The weights of this array are *relative*, e.g. an array `Array(3.0, 2.0, 1.0)` would results in approximately 50% of the observations being in the training set, 33% of the observations in the testing set, and 17% of the observations in holdout set.
    +
    +### K-Fold Splits
    +
    +In a *k-fold* strategy, the DataSet is split into *k* equal subsets. Then for each of the *k* subsets, a `TrainTestDataSet` is created where the subset is the `.training` DataSet, and the remaining subsets are the `.testing` set.
    +
    +For each training set, an algorithm is trained and then is evaluated based on the predictions based on the assosciated testing set. When an algorithm that has consistent grades (e.g. prediction errors) across held out datasets we can have some confidence that our approach (e.g. choice of algorithm / algorithm parameters / number of iterations) is robust against overfitting.
    --- End diff --
    
    typo: assosciated --> associated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60735615
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    --- End diff --
    
    formatting of the function header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739344
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
    +
    +    val expectedLength = dataSet.count().toDouble * 0.5
    +
    +    splitDataSets(0).count().toDouble should equal(expectedLength +- 5.0)
    --- End diff --
    
    Does this mean that the test case could fail? Even if its unlikely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60738251
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
    +import org.apache.flink.api.java.Utils
    +import org.apache.flink.api.scala. DataSet
    +import org.apache.flink.api.scala.utils._
    +
    +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
    +import _root_.scala.reflect.ClassTag
    +
    +object Splitter {
    +
    +  case class TrainTestDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                             testing: DataSet[T])
    +
    +  case class TrainTestHoldoutDataSet[T: TypeInformation : ClassTag](training: DataSet[T],
    +                                                                    testing: DataSet[T],
    +                                                                    holdout: DataSet[T])
    +  // --------------------------------------------------------------------------------------------
    +  //  randomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fraction        Probability that each element is chosen, should be [0,1] without
    +   *                        replacement, and [0, ∞) with replacement. While fraction is larger
    +   *                        than 1, the elements are expected to be selected multi times into
    +   *                        sample on average. This fraction refers to the first element in the
    +   *                        resulting array.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of two datasets
    +   */
    +
    +  def randomSplit[T: TypeInformation : ClassTag]( input: DataSet[T],
    +                                                  fraction: Double,
    +                                                  precise: Boolean = false,
    +                                                  seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    import org.apache.flink.api.scala._
    +
    +    val indexedInput: DataSet[(Long, T)] = input.zipWithIndex
    +
    +    val leftSplit: DataSet[(Long, T)] = precise match {
    +      case false => indexedInput.sample(false, fraction, seed)
    +      case true => {
    +        val count = indexedInput.count()
    +        val numOfSamples = math.round(fraction * count).toInt
    +        indexedInput.sampleWithSize(false, numOfSamples, seed)
    +      }
    +    }
    +
    +    val rightSplit: DataSet[(Long, T)] = indexedInput.leftOuterJoin[(Long, T)](leftSplit)
    +      .where(0)
    +      .equalTo(0) {
    +        (full: (Long,T) , left: (Long, T)) =>  (if (left == null) full else null)
    +      }
    +      .filter( o => o != null )
    +    Array(leftSplit.map(o => o._2), rightSplit.map(o => o._2))
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  multiRandomSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet by the probability fraction of each element of a vector.
    +   *
    +   * @param input           DataSet to be split
    +   * @param fracArray       An array of PROPORTIONS for splitting the DataSet. Unlike the
    +   *                        randomSplit function, number greater than 1 do not lead to over
    +   *                        sampling. The number of splits is dictated by the length of this array.
    +   *                        The number are normalized, eg. Array(1.0, 2.0) would yield
    +   *                        two data sets with a 33/66% split.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of DataSets whose length is equal to the length of fracArray
    +   */
    +  def multiRandomSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                        fracArray: Array[Double],
    +                        precise: Boolean = false,
    +                        seed: Long = Utils.RNG.nextLong())
    +  : Array[DataSet[T]] = {
    +    val splits = fracArray.length
    +    val output = new Array[DataSet[T]](splits)
    +    val aggs = fracArray.scanRight((0.0))( _ + _ )
    +    val fracs = fracArray.zip(aggs).map( o => o._1 / o._2)
    +
    +    ////
    +    var tempDS = input
    +    for (k <- 0 to splits-2){
    +      println( (splits -k))
    +      var temp = Splitter.randomSplit(tempDS, fracs(k), true)
    +      output(k) = temp(0)
    +      tempDS = temp(1)
    +    }
    +    output(splits-1) = tempDS
    +    output
    +  }
    +
    +  // --------------------------------------------------------------------------------------------
    +  //  kFoldSplit
    +  // --------------------------------------------------------------------------------------------
    +  /**
    +   * Split a DataSet into an array of TrainTest DataSets
    +   *
    +   * @param input           DataSet to be split
    +   * @param kFolds          The number of TrainTest DataSets to be returns. Each 'testing' will be
    +   *                        1/k of the dataset, randomly sampled, the training will be the remainder
    +   *                        of the dataset.  The DataSet is split into kFolds first, so that no
    +   *                        observation will occurin in multiple folds.
    +   * @param precise         Sampling by default is random and can result in slightly lop-sided
    +   *                        sample sets. When precise is true, equal sample set size are forced,
    +   *                        however this is somewhat less efficient.
    +   * @param seed            Random number generator seed.
    +   * @return An array of TrainTestDataSets
    +   */
    +  def kFoldSplit[T: TypeInformation : ClassTag](input: DataSet[T],
    +                                                kFolds: Int,
    +                                                precise: Boolean = false,
    +                                                seed: Long = Utils.RNG.nextLong())
    +  : Array[TrainTestDataSet[T]] = {
    +
    +    val fracs = Array.fill(kFolds)(1.0)
    +    val dataSetArray = multiRandomSplit(input, fracs, precise, seed)
    +
    +    dataSetArray.zipWithIndex.map( ds => TrainTestDataSet(ds._1,
    --- End diff --
    
    For what do you need the `zipWithIndex` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-215826478
  
    Hey @tillrohrmann, thanks for the review.  I've addressed your commments in the code. I am in the middle of documentation had to commit. Should finish up docs this afternoon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1898#discussion_r60739509
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/SplitterITSuite.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala._
    +import org.apache.flink.test.util.FlinkTestBase
    +import org.scalatest.{Matchers, FlatSpec}
    +import org.apache.flink.ml.math.Vector
    +import org.apache.flink.api.scala.utils._
    +
    +
    +class SplitterITSuite extends FlatSpec
    +  with Matchers
    +  with FlinkTestBase {
    +
    +  behavior of "Flink's DataSet Splitter"
    +
    +  import MinMaxScalerData._
    +
    + it should "result in datasets with no elements in common and all elements used" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet.zipWithIndex, 0.5)
    +
    +   (splitDataSets(0).count() + splitDataSets(1).count()) should equal(dataSet.count())
    +
    +
    +   splitDataSets(0).join(splitDataSets(1)).where(0).equalTo(0).count() should equal(0)
    +  }
    +
    +  it should "result in datasets of an expected size when precise" in {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val dataSet = env.fromCollection(data)
    +
    +    val splitDataSets = Splitter.randomSplit(dataSet, 0.5)
    --- End diff --
    
    Why don't we use the precise sampling here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1898: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1898


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1898#issuecomment-219696697
  
    As you've said, the failing test case is unrelated and therefore not a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1898: [FLINK-2259][ml] Add Train-Testing Splitters

Posted by rawkintrevo <gi...@git.apache.org>.
Github user rawkintrevo commented on the issue:

    https://github.com/apache/flink/pull/1898
  
    No problem @tillrohrmann ! I know there's been a lot going on.  thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---