You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2020/03/29 09:52:04 UTC

[submarine] branch master updated: SUBMARINE-450. [SDK] Refactor ML model codebase

This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 43fe754  SUBMARINE-450. [SDK] Refactor ML model codebase
43fe754 is described below

commit 43fe75440aa1d595c1630220d7ce145ddf7efac4
Author: pingsutw <pi...@gmail.com>
AuthorDate: Tue Mar 24 04:17:14 2020 +0800

    SUBMARINE-450. [SDK] Refactor ML model codebase
    
    ### What is this PR for?
    refactor deepfm.py and fm.py
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-450
    
    ### How should this be tested?
    https://travis-ci.org/github/pingsutw/hadoop-submarine/builds/666228791
    https://github.com/pingsutw/hadoop-submarine/actions/runs/62059056
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: pingsutw <pi...@gmail.com>
    
    Closes #246 from pingsutw/SUBMARINE-450 and squashes the following commits:
    
    ce57041 [pingsutw] SUBMARINE-450. [SDK] Refactor ML model codebase
---
 .../pysubmarine/submarine/ml/layers/core.py        | 55 +++++++++++-----
 .../pysubmarine/submarine/ml/model/deepfm.py       | 76 ++++------------------
 submarine-sdk/pysubmarine/submarine/ml/model/fm.py | 42 +++---------
 .../pysubmarine/submarine/ml/model/nfm.py          |  5 +-
 4 files changed, 61 insertions(+), 117 deletions(-)

diff --git a/submarine-sdk/pysubmarine/submarine/ml/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
index 9a52024..bf72185 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
@@ -26,11 +26,11 @@ def batch_norm_layer(x, train_phase, scope_bn, batch_norm_decay):
     return tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
 
 
-def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, batch_norm_decay=0.9,
+def dnn_layer(inputs, estimator_mode, batch_norm, deep_layers, dropout, batch_norm_decay=0.9,
               l2_reg=0, **kwargs):
     """
     The Multi Layer Percetron
-    :param deep_inputs: A tensor of at least rank 2 and static value for the last dimension; i.e.
+    :param inputs: A tensor of at least rank 2 and static value for the last dimension; i.e.
            [batch_size, depth], [None, None, None, channels].
     :param estimator_mode: Standard names for Estimator model modes. `TRAIN`, `EVAL`, `PREDICT`
     :param batch_norm: Whether use BatchNormalization before activation or not.
@@ -51,7 +51,7 @@ def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, bat
 
         for i in range(len(deep_layers)):
             deep_inputs = tf.contrib.layers.fully_connected(
-                inputs=deep_inputs, num_outputs=deep_layers[i],
+                inputs=inputs, num_outputs=deep_layers[i],
                 weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
                 scope='mlp%d' % i)
             if batch_norm:
@@ -66,7 +66,7 @@ def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, bat
             weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
             scope='deep_out')
         deep_out = tf.reshape(deep_out, shape=[-1])
-        return deep_out
+    return deep_out
 
 
 def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs):
@@ -91,15 +91,15 @@ def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs):
                                         initializer=tf.glorot_normal_initializer(),
                                         regularizer=regularizer)
 
-    feat_weights = tf.nn.embedding_lookup(linear_weight, feat_ids)
-    linear_out = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1) + linear_bias
+        feat_weights = tf.nn.embedding_lookup(linear_weight, feat_ids)
+        linear_out = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1) + linear_bias
     return linear_out
 
 
-def bilinear_layer(features, feature_size, field_size, embedding_size, l2_reg=0, **kwargs):
+def embedding_layer(features, feature_size, field_size, embedding_size, l2_reg=0, **kwargs):
     """
-    Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise product of features
-    into one single vector.
+    Turns positive integers (indexes) into dense vectors of fixed size.
+    eg. [[4], [20]] -> [[0.25, 0.1], [0.6, -0.2]]
     :param features: input features
     :param feature_size: size of features
     :param field_size: number of fields in the features
@@ -112,17 +112,40 @@ def bilinear_layer(features, feature_size, field_size, embedding_size, l2_reg=0,
     feat_vals = features['feat_vals']
     feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
 
-    with tf.variable_scope("BilinearLayer_Layer"):
+    with tf.variable_scope("Embedding_Layer"):
         regularizer = tf.contrib.layers.l2_regularizer(l2_reg)
         embedding_dict = tf.get_variable(name='embedding_dict',
                                          shape=[feature_size, embedding_size],
                                          initializer=tf.glorot_normal_initializer(),
                                          regularizer=regularizer)
+        embeddings = tf.nn.embedding_lookup(embedding_dict, feat_ids)
+        feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
+        embedding_out = tf.multiply(embeddings, feat_vals)
+    return embedding_out
+
+
+def bilinear_layer(inputs, **kwargs):
+    """
+    Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise product of features
+    into one single vector.
+    :param inputs: input features
+    """
 
-    embeddings = tf.nn.embedding_lookup(embedding_dict, feat_ids)
-    feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
-    embeddings = tf.multiply(embeddings, feat_vals)
-    sum_square = tf.square(tf.reduce_sum(embeddings, 1))
-    square_sum = tf.reduce_sum(tf.square(embeddings), 1)
-    bilinear_out = 0.5 * tf.subtract(sum_square, square_sum)
+    with tf.variable_scope("BilinearLayer_Layer"):
+        sum_square = tf.square(tf.reduce_sum(inputs, 1))
+        square_sum = tf.reduce_sum(tf.square(inputs), 1)
+        bilinear_out = 0.5 * tf.subtract(sum_square, square_sum)
     return bilinear_out
+
+
+def fm_layer(inputs, **kwargs):
+    """
+    Factorization Machine models pairwise (order-2) feature interactions
+    without linear term and bias.
+    :param inputs: input features
+    """
+    with tf.variable_scope("FM_Layer"):
+        sum_square = tf.square(tf.reduce_sum(inputs, 1))
+        square_sum = tf.reduce_sum(tf.square(inputs), 1)
+        fm_out = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
+    return fm_out
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
index 9c6d506..ff7dd76 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
@@ -27,9 +27,8 @@ Reference:
 
 import logging
 import tensorflow as tf
-import numpy as np
 from submarine.ml.model.base_tf_model import BaseTFModel
-from submarine.ml.layers.core import batch_norm_layer
+from submarine.ml.layers.core import fm_layer, linear_layer, dnn_layer, embedding_layer
 from submarine.utils.tf_utils import get_estimator_spec
 
 logger = logging.getLogger(__name__)
@@ -37,72 +36,19 @@ logger = logging.getLogger(__name__)
 
 class DeepFM(BaseTFModel):
     def model_fn(self, features, labels, mode, params):
-        field_size = params["training"]["field_size"]
-        feature_size = params["training"]["feature_size"]
-        embedding_size = params["training"]["embedding_size"]
-        l2_reg = params["training"]["l2_reg"]
-        batch_norm = params["training"]["batch_norm"]
-        batch_norm_decay = params["training"]["batch_norm_decay"]
-        seed = params["training"]["seed"]
-        layers = params["training"]["deep_layers"]
-        dropout = params["training"]["dropout"]
+        super().model_fn(features, labels, mode, params)
 
-        np.random.seed(seed)
-        tf.set_random_seed(seed)
+        linear_logit = linear_layer(features, **params['training'])
 
-        fm_bias = tf.get_variable(name='fm_bias', shape=[1],
-                                  initializer=tf.constant_initializer(0.0))
-        fm_weight = tf.get_variable(name='fm_weight', shape=[feature_size],
-                                    initializer=tf.glorot_normal_initializer())
-        fm_vector = tf.get_variable(name='fm_vector', shape=[feature_size, embedding_size],
-                                    initializer=tf.glorot_normal_initializer())
+        embedding_outputs = embedding_layer(features, **params['training'])
+        fm_logit = fm_layer(embedding_outputs, **params['training'])
 
-        with tf.variable_scope("Feature"):
-            feat_ids = features['feat_ids']
-            feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
-            feat_vals = features['feat_vals']
-            feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
+        field_size = params['training']['field_size']
+        embedding_size = params['training']['embedding_size']
+        deep_inputs = tf.reshape(embedding_outputs, shape=[-1, field_size * embedding_size])
+        deep_logit = dnn_layer(deep_inputs, mode, **params['training'])
 
-        with tf.variable_scope("First_order"):
-            feat_weights = tf.nn.embedding_lookup(fm_weight, feat_ids)
-            y_w = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1)
-
-        with tf.variable_scope("Second_order"):
-            embeddings = tf.nn.embedding_lookup(fm_vector, feat_ids)
-            feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
-            embeddings = tf.multiply(embeddings, feat_vals)
-            sum_square = tf.square(tf.reduce_sum(embeddings, 1))
-            square_sum = tf.reduce_sum(tf.square(embeddings), 1)
-            y_v = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
-
-        with tf.variable_scope("Deep-part"):
-            if batch_norm:
-                if mode == tf.estimator.ModeKeys.TRAIN:
-                    train_phase = True
-                else:
-                    train_phase = False
-
-            deep_inputs = tf.reshape(embeddings, shape=[-1, field_size * embedding_size])
-            for i in range(len(layers)):
-                deep_inputs = tf.contrib.layers.fully_connected(
-                    inputs=deep_inputs, num_outputs=layers[i],
-                    weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
-                    scope='mlp%d' % i)
-                if batch_norm:
-                    deep_inputs = batch_norm_layer(
-                        deep_inputs, train_phase=train_phase,
-                        scope_bn='bn_%d' % i, batch_norm_decay=batch_norm_decay)
-                if mode == tf.estimator.ModeKeys.TRAIN:
-                    deep_inputs = tf.nn.dropout(deep_inputs, keep_prob=dropout[i])
-
-            y_deep = tf.contrib.layers.fully_connected(
-                inputs=deep_inputs, num_outputs=1, activation_fn=tf.identity,
-                weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
-                scope='deep_out')
-            y_d = tf.reshape(y_deep, shape=[-1])
-
-        with tf.variable_scope("DeepFM-out"):
-            y_bias = fm_bias * tf.ones_like(y_d, dtype=tf.float32)
-            logit = y_bias + y_w + y_v + y_d
+        with tf.variable_scope("DeepFM_out"):
+            logit = linear_logit + fm_logit + deep_logit
 
         return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/fm.py b/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
index 19e4c65..4dc82db 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
@@ -23,7 +23,7 @@ Reference:
 
 import logging
 import tensorflow as tf
-import numpy as np
+from submarine.ml.layers.core import linear_layer, fm_layer, embedding_layer
 from submarine.ml.model.base_tf_model import BaseTFModel
 from submarine.utils.tf_utils import get_estimator_spec
 
@@ -32,39 +32,13 @@ logger = logging.getLogger(__name__)
 
 class FM(BaseTFModel):
     def model_fn(self, features, labels, mode, params):
-        field_size = params["training"]["field_size"]
-        feature_size = params["training"]["feature_size"]
-        embedding_size = params["training"]["embedding_size"]
-        seed = params["training"]["seed"]
+        super().model_fn(features, labels, mode, params)
 
-        np.random.seed(seed)
-        tf.set_random_seed(seed)
+        linear_logit = linear_layer(features, **params['training'])
+        embedding_outputs = embedding_layer(features, **params['training'])
+        fm_logit = fm_layer(embedding_outputs, **params['training'])
 
-        fm_bias = tf.get_variable(name='fm_bias', shape=[1],
-                                  initializer=tf.constant_initializer(0.0))
-        fm_weight = tf.get_variable(name='fm_weight', shape=[feature_size],
-                                    initializer=tf.glorot_normal_initializer())
-        fm_vector = tf.get_variable(name='fm_vector', shape=[feature_size, embedding_size],
-                                    initializer=tf.glorot_normal_initializer())
+        with tf.variable_scope("FM_out"):
+            logit = linear_logit + fm_logit
 
-        with tf.variable_scope("Feature"):
-            feat_ids = features['feat_ids']
-            feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
-            feat_vals = features['feat_vals']
-            feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
-
-        with tf.variable_scope("First_order"):
-            feat_weights = tf.nn.embedding_lookup(fm_weight, feat_ids)
-            y_w = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1)
-
-        with tf.variable_scope("Second_order"):
-            embeddings = tf.nn.embedding_lookup(fm_vector, feat_ids)
-            feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
-            embeddings = tf.multiply(embeddings, feat_vals)
-            sum_square = tf.square(tf.reduce_sum(embeddings, 1))
-            square_sum = tf.reduce_sum(tf.square(embeddings), 1)
-            y_v = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
-
-        y = fm_bias + y_w + y_v
-
-        return get_estimator_spec(y, labels, mode, params)
+        return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py b/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
index 957f32a..331e973 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
@@ -25,7 +25,7 @@ Reference:
 import logging
 import tensorflow as tf
 from submarine.ml.model.base_tf_model import BaseTFModel
-from submarine.ml.layers.core import dnn_layer, bilinear_layer, linear_layer
+from submarine.ml.layers.core import dnn_layer, bilinear_layer, linear_layer, embedding_layer
 from submarine.utils.tf_utils import get_estimator_spec
 
 logger = logging.getLogger(__name__)
@@ -36,7 +36,8 @@ class NFM(BaseTFModel):
         super().model_fn(features, labels, mode, params)
 
         linear_logit = linear_layer(features, **params['training'])
-        deep_inputs = bilinear_layer(features, **params['training'])
+        embedding_outputs = embedding_layer(features, **params['training'])
+        deep_inputs = bilinear_layer(embedding_outputs, **params['training'])
         deep_logit = dnn_layer(deep_inputs, mode,  **params['training'])
 
         with tf.variable_scope("NFM_out"):


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org