You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2020/03/29 09:52:04 UTC
[submarine] branch master updated: SUBMARINE-450. [SDK] Refactor ML
model codebase
This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 43fe754 SUBMARINE-450. [SDK] Refactor ML model codebase
43fe754 is described below
commit 43fe75440aa1d595c1630220d7ce145ddf7efac4
Author: pingsutw <pi...@gmail.com>
AuthorDate: Tue Mar 24 04:17:14 2020 +0800
SUBMARINE-450. [SDK] Refactor ML model codebase
### What is this PR for?
refactor deepfm.py and fm.py
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-450
### How should this be tested?
https://travis-ci.org/github/pingsutw/hadoop-submarine/builds/666228791
https://github.com/pingsutw/hadoop-submarine/actions/runs/62059056
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: pingsutw <pi...@gmail.com>
Closes #246 from pingsutw/SUBMARINE-450 and squashes the following commits:
ce57041 [pingsutw] SUBMARINE-450. [SDK] Refactor ML model codebase
---
.../pysubmarine/submarine/ml/layers/core.py | 55 +++++++++++-----
.../pysubmarine/submarine/ml/model/deepfm.py | 76 ++++------------------
submarine-sdk/pysubmarine/submarine/ml/model/fm.py | 42 +++---------
.../pysubmarine/submarine/ml/model/nfm.py | 5 +-
4 files changed, 61 insertions(+), 117 deletions(-)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
index 9a52024..bf72185 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/layers/core.py
@@ -26,11 +26,11 @@ def batch_norm_layer(x, train_phase, scope_bn, batch_norm_decay):
return tf.cond(tf.cast(train_phase, tf.bool), lambda: bn_train, lambda: bn_infer)
-def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, batch_norm_decay=0.9,
+def dnn_layer(inputs, estimator_mode, batch_norm, deep_layers, dropout, batch_norm_decay=0.9,
l2_reg=0, **kwargs):
"""
The Multi Layer Percetron
- :param deep_inputs: A tensor of at least rank 2 and static value for the last dimension; i.e.
+ :param inputs: A tensor of at least rank 2 and static value for the last dimension; i.e.
[batch_size, depth], [None, None, None, channels].
:param estimator_mode: Standard names for Estimator model modes. `TRAIN`, `EVAL`, `PREDICT`
:param batch_norm: Whether use BatchNormalization before activation or not.
@@ -51,7 +51,7 @@ def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, bat
for i in range(len(deep_layers)):
deep_inputs = tf.contrib.layers.fully_connected(
- inputs=deep_inputs, num_outputs=deep_layers[i],
+ inputs=inputs, num_outputs=deep_layers[i],
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
scope='mlp%d' % i)
if batch_norm:
@@ -66,7 +66,7 @@ def dnn_layer(deep_inputs, estimator_mode, batch_norm, deep_layers, dropout, bat
weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
scope='deep_out')
deep_out = tf.reshape(deep_out, shape=[-1])
- return deep_out
+ return deep_out
def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs):
@@ -91,15 +91,15 @@ def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs):
initializer=tf.glorot_normal_initializer(),
regularizer=regularizer)
- feat_weights = tf.nn.embedding_lookup(linear_weight, feat_ids)
- linear_out = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1) + linear_bias
+ feat_weights = tf.nn.embedding_lookup(linear_weight, feat_ids)
+ linear_out = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1) + linear_bias
return linear_out
-def bilinear_layer(features, feature_size, field_size, embedding_size, l2_reg=0, **kwargs):
+def embedding_layer(features, feature_size, field_size, embedding_size, l2_reg=0, **kwargs):
"""
- Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise product of features
- into one single vector.
+ Turns positive integers (indexes) into dense vectors of fixed size.
+ eg. [[4], [20]] -> [[0.25, 0.1], [0.6, -0.2]]
:param features: input features
:param feature_size: size of features
:param field_size: number of fields in the features
@@ -112,17 +112,40 @@ def bilinear_layer(features, feature_size, field_size, embedding_size, l2_reg=0,
feat_vals = features['feat_vals']
feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
- with tf.variable_scope("BilinearLayer_Layer"):
+ with tf.variable_scope("Embedding_Layer"):
regularizer = tf.contrib.layers.l2_regularizer(l2_reg)
embedding_dict = tf.get_variable(name='embedding_dict',
shape=[feature_size, embedding_size],
initializer=tf.glorot_normal_initializer(),
regularizer=regularizer)
+ embeddings = tf.nn.embedding_lookup(embedding_dict, feat_ids)
+ feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
+ embedding_out = tf.multiply(embeddings, feat_vals)
+ return embedding_out
+
+
+def bilinear_layer(inputs, **kwargs):
+ """
+ Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise product of features
+ into one single vector.
+ :param inputs: input features
+ """
- embeddings = tf.nn.embedding_lookup(embedding_dict, feat_ids)
- feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
- embeddings = tf.multiply(embeddings, feat_vals)
- sum_square = tf.square(tf.reduce_sum(embeddings, 1))
- square_sum = tf.reduce_sum(tf.square(embeddings), 1)
- bilinear_out = 0.5 * tf.subtract(sum_square, square_sum)
+ with tf.variable_scope("BilinearLayer_Layer"):
+ sum_square = tf.square(tf.reduce_sum(inputs, 1))
+ square_sum = tf.reduce_sum(tf.square(inputs), 1)
+ bilinear_out = 0.5 * tf.subtract(sum_square, square_sum)
return bilinear_out
+
+
+def fm_layer(inputs, **kwargs):
+ """
+ Factorization Machine models pairwise (order-2) feature interactions
+ without linear term and bias.
+ :param inputs: input features
+ """
+ with tf.variable_scope("FM_Layer"):
+ sum_square = tf.square(tf.reduce_sum(inputs, 1))
+ square_sum = tf.reduce_sum(tf.square(inputs), 1)
+ fm_out = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
+ return fm_out
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
index 9c6d506..ff7dd76 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/deepfm.py
@@ -27,9 +27,8 @@ Reference:
import logging
import tensorflow as tf
-import numpy as np
from submarine.ml.model.base_tf_model import BaseTFModel
-from submarine.ml.layers.core import batch_norm_layer
+from submarine.ml.layers.core import fm_layer, linear_layer, dnn_layer, embedding_layer
from submarine.utils.tf_utils import get_estimator_spec
logger = logging.getLogger(__name__)
@@ -37,72 +36,19 @@ logger = logging.getLogger(__name__)
class DeepFM(BaseTFModel):
def model_fn(self, features, labels, mode, params):
- field_size = params["training"]["field_size"]
- feature_size = params["training"]["feature_size"]
- embedding_size = params["training"]["embedding_size"]
- l2_reg = params["training"]["l2_reg"]
- batch_norm = params["training"]["batch_norm"]
- batch_norm_decay = params["training"]["batch_norm_decay"]
- seed = params["training"]["seed"]
- layers = params["training"]["deep_layers"]
- dropout = params["training"]["dropout"]
+ super().model_fn(features, labels, mode, params)
- np.random.seed(seed)
- tf.set_random_seed(seed)
+ linear_logit = linear_layer(features, **params['training'])
- fm_bias = tf.get_variable(name='fm_bias', shape=[1],
- initializer=tf.constant_initializer(0.0))
- fm_weight = tf.get_variable(name='fm_weight', shape=[feature_size],
- initializer=tf.glorot_normal_initializer())
- fm_vector = tf.get_variable(name='fm_vector', shape=[feature_size, embedding_size],
- initializer=tf.glorot_normal_initializer())
+ embedding_outputs = embedding_layer(features, **params['training'])
+ fm_logit = fm_layer(embedding_outputs, **params['training'])
- with tf.variable_scope("Feature"):
- feat_ids = features['feat_ids']
- feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
- feat_vals = features['feat_vals']
- feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
+ field_size = params['training']['field_size']
+ embedding_size = params['training']['embedding_size']
+ deep_inputs = tf.reshape(embedding_outputs, shape=[-1, field_size * embedding_size])
+ deep_logit = dnn_layer(deep_inputs, mode, **params['training'])
- with tf.variable_scope("First_order"):
- feat_weights = tf.nn.embedding_lookup(fm_weight, feat_ids)
- y_w = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1)
-
- with tf.variable_scope("Second_order"):
- embeddings = tf.nn.embedding_lookup(fm_vector, feat_ids)
- feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
- embeddings = tf.multiply(embeddings, feat_vals)
- sum_square = tf.square(tf.reduce_sum(embeddings, 1))
- square_sum = tf.reduce_sum(tf.square(embeddings), 1)
- y_v = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
-
- with tf.variable_scope("Deep-part"):
- if batch_norm:
- if mode == tf.estimator.ModeKeys.TRAIN:
- train_phase = True
- else:
- train_phase = False
-
- deep_inputs = tf.reshape(embeddings, shape=[-1, field_size * embedding_size])
- for i in range(len(layers)):
- deep_inputs = tf.contrib.layers.fully_connected(
- inputs=deep_inputs, num_outputs=layers[i],
- weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
- scope='mlp%d' % i)
- if batch_norm:
- deep_inputs = batch_norm_layer(
- deep_inputs, train_phase=train_phase,
- scope_bn='bn_%d' % i, batch_norm_decay=batch_norm_decay)
- if mode == tf.estimator.ModeKeys.TRAIN:
- deep_inputs = tf.nn.dropout(deep_inputs, keep_prob=dropout[i])
-
- y_deep = tf.contrib.layers.fully_connected(
- inputs=deep_inputs, num_outputs=1, activation_fn=tf.identity,
- weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg),
- scope='deep_out')
- y_d = tf.reshape(y_deep, shape=[-1])
-
- with tf.variable_scope("DeepFM-out"):
- y_bias = fm_bias * tf.ones_like(y_d, dtype=tf.float32)
- logit = y_bias + y_w + y_v + y_d
+ with tf.variable_scope("DeepFM_out"):
+ logit = linear_logit + fm_logit + deep_logit
return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/fm.py b/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
index 19e4c65..4dc82db 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/fm.py
@@ -23,7 +23,7 @@ Reference:
import logging
import tensorflow as tf
-import numpy as np
+from submarine.ml.layers.core import linear_layer, fm_layer, embedding_layer
from submarine.ml.model.base_tf_model import BaseTFModel
from submarine.utils.tf_utils import get_estimator_spec
@@ -32,39 +32,13 @@ logger = logging.getLogger(__name__)
class FM(BaseTFModel):
def model_fn(self, features, labels, mode, params):
- field_size = params["training"]["field_size"]
- feature_size = params["training"]["feature_size"]
- embedding_size = params["training"]["embedding_size"]
- seed = params["training"]["seed"]
+ super().model_fn(features, labels, mode, params)
- np.random.seed(seed)
- tf.set_random_seed(seed)
+ linear_logit = linear_layer(features, **params['training'])
+ embedding_outputs = embedding_layer(features, **params['training'])
+ fm_logit = fm_layer(embedding_outputs, **params['training'])
- fm_bias = tf.get_variable(name='fm_bias', shape=[1],
- initializer=tf.constant_initializer(0.0))
- fm_weight = tf.get_variable(name='fm_weight', shape=[feature_size],
- initializer=tf.glorot_normal_initializer())
- fm_vector = tf.get_variable(name='fm_vector', shape=[feature_size, embedding_size],
- initializer=tf.glorot_normal_initializer())
+ with tf.variable_scope("FM_out"):
+ logit = linear_logit + fm_logit
- with tf.variable_scope("Feature"):
- feat_ids = features['feat_ids']
- feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
- feat_vals = features['feat_vals']
- feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
-
- with tf.variable_scope("First_order"):
- feat_weights = tf.nn.embedding_lookup(fm_weight, feat_ids)
- y_w = tf.reduce_sum(tf.multiply(feat_weights, feat_vals), 1)
-
- with tf.variable_scope("Second_order"):
- embeddings = tf.nn.embedding_lookup(fm_vector, feat_ids)
- feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
- embeddings = tf.multiply(embeddings, feat_vals)
- sum_square = tf.square(tf.reduce_sum(embeddings, 1))
- square_sum = tf.reduce_sum(tf.square(embeddings), 1)
- y_v = 0.5 * tf.reduce_sum(tf.subtract(sum_square, square_sum), 1)
-
- y = fm_bias + y_w + y_v
-
- return get_estimator_spec(y, labels, mode, params)
+ return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py b/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
index 957f32a..331e973 100644
--- a/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/model/nfm.py
@@ -25,7 +25,7 @@ Reference:
import logging
import tensorflow as tf
from submarine.ml.model.base_tf_model import BaseTFModel
-from submarine.ml.layers.core import dnn_layer, bilinear_layer, linear_layer
+from submarine.ml.layers.core import dnn_layer, bilinear_layer, linear_layer, embedding_layer
from submarine.utils.tf_utils import get_estimator_spec
logger = logging.getLogger(__name__)
@@ -36,7 +36,8 @@ class NFM(BaseTFModel):
super().model_fn(features, labels, mode, params)
linear_logit = linear_layer(features, **params['training'])
- deep_inputs = bilinear_layer(features, **params['training'])
+ embedding_outputs = embedding_layer(features, **params['training'])
+ deep_inputs = bilinear_layer(embedding_outputs, **params['training'])
deep_logit = dnn_layer(deep_inputs, mode, **params['training'])
with tf.variable_scope("NFM_out"):
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org