You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhipeng93 (via GitHub)" <gi...@apache.org> on 2023/03/09 02:29:25 UTC

[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #220: [FLINK-31325] Improve performance of Swing

zhipeng93 commented on code in PR #220:
URL: https://github.com/apache/flink-ml/pull/220#discussion_r1130294319


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##########
@@ -140,6 +141,7 @@ public void testParam() {
         assertEquals(5, swing.getAlpha1());
         assertEquals(1, swing.getAlpha2());
         assertEquals(0.35, swing.getBeta(), 1e-9);
+        assertEquals(1, swing.getSeed());

Review Comment:
   Let's also check the default value of `seed`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -274,13 +272,29 @@ private static class ComputingSimilarItems extends AbstractStreamOperator<Row>
         private static final Character commaDelimiter = ',';
         private static final Character semicolonDelimiter = ';';
 
+        private final Random random;
+
+        private Map<Long, long[]> userAndPurchasedItems = new HashMap<>();
+        private Map<Long, List<Long>> itemAndPurchasers = new HashMap<>();
+
+        private ListState<Map<Long, long[]>> userAndPurchasedItemsState;
+        private ListState<Map<Long, List<Long>>> itemAndPurchasersState;
+
         private ComputingSimilarItems(
-                int k, int maxUserNumPerItem, int alpha1, int alpha2, double beta) {
+                int k,
+                int maxUserNumPerItem,
+                int maxUserBehavior,
+                int alpha1,
+                int alpha2,
+                double beta,
+                long seed) {
             this.k = k;
             this.maxUserNumPerItem = maxUserNumPerItem;
+            this.maxUserBehavior = maxUserBehavior;
             this.alpha1 = alpha1;
             this.alpha2 = alpha2;
             this.beta = beta;
+            this.random = new Random(seed);

Review Comment:
   Is there a test case that uses a non-default seed? I would expect using different seeds would lead to different output but did not find it.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
             Map<Long, Double> userWeights = new HashMap<>(userAndPurchasedItems.size());
             userAndPurchasedItems.forEach(
                     (k, v) -> {
-                        int count = v.size();
+                        int count = v.length;
                         userWeights.put(k, calculateWeight(count));
                     });
 
             for (long mainItem : itemAndPurchasers.keySet()) {
-                List<Long> userList =
-                        sampleUserList(itemAndPurchasers.get(mainItem), maxUserNumPerItem);
+                List<Long> userList = new ArrayList<>(itemAndPurchasers.get(mainItem));

Review Comment:
   Is creating a new list (which introduces extra object copy) necessary?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
             Map<Long, Double> userWeights = new HashMap<>(userAndPurchasedItems.size());
             userAndPurchasedItems.forEach(
                     (k, v) -> {
-                        int count = v.size();
+                        int count = v.length;
                         userWeights.put(k, calculateWeight(count));
                     });
 
             for (long mainItem : itemAndPurchasers.keySet()) {
-                List<Long> userList =
-                        sampleUserList(itemAndPurchasers.get(mainItem), maxUserNumPerItem);
+                List<Long> userList = new ArrayList<>(itemAndPurchasers.get(mainItem));
+                long[] interaction = new long[maxUserBehavior];
                 HashMap<Long, Double> id2swing = new HashMap<>();
 
-                for (int i = 0; i < userList.size(); i++) {
+                for (int i = 1; i < userList.size(); i++) {
                     long u = userList.get(i);
+                    int interactionSize = 0;

Review Comment:
   nit: Variable 'interactionSize' initializer '0' is redundant 



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
             Map<Long, Double> userWeights = new HashMap<>(userAndPurchasedItems.size());
             userAndPurchasedItems.forEach(
                     (k, v) -> {
-                        int count = v.size();
+                        int count = v.length;
                         userWeights.put(k, calculateWeight(count));
                     });
 
             for (long mainItem : itemAndPurchasers.keySet()) {
-                List<Long> userList =
-                        sampleUserList(itemAndPurchasers.get(mainItem), maxUserNumPerItem);
+                List<Long> userList = new ArrayList<>(itemAndPurchasers.get(mainItem));
+                long[] interaction = new long[maxUserBehavior];

Review Comment:
   nit: we can put this outside the loop for efficiency.



##########
flink-ml-python/pyflink/ml/recommendation/tests/test_swing.py:
##########
@@ -104,6 +105,7 @@ def test_param(self):
         self.assertEqual(5, swing.alpha1)
         self.assertEqual(1, swing.alpha2)
         self.assertAlmostEqual(0.35, swing.beta, delta=1e-9)
+        self.assertEqual(1, swing.seed)

Review Comment:
   Let's also check the default value of `seed`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -350,32 +368,52 @@ private double calculateWeight(int size) {
             return (1.0 / Math.pow(alpha1 + size, beta));
         }
 
-        private static List<Long> sampleUserList(Map<Long, String> allUsers, int sampleSize) {
-            int totalSize = allUsers.size();
-            List<Long> userList = new ArrayList<>(allUsers.keySet());
-
-            if (totalSize < sampleSize) {
-                return userList;
+        private static int calaculateCommonItems(long[] u, long[] v, long[] interaction) {
+            int pointerU = 0;
+            int pointerV = 0;
+            int interactionSize = 0;
+            while (pointerU < u.length && pointerV < v.length) {
+                if (u[pointerU] == v[pointerV]) {
+                    interaction[interactionSize++] = u[pointerU];
+                    pointerU++;
+                    pointerV++;
+                } else if (u[pointerU] < v[pointerV]) {
+                    pointerU++;
+                } else {
+                    pointerV++;
+                }
             }
-
-            Collections.shuffle(userList);
-            return userList.subList(0, sampleSize);
+            return interactionSize;
         }
 
         @Override
         public void processElement(StreamRecord<Tuple3<Long, Long, long[]>> streamRecord)
                 throws Exception {
             Tuple3<Long, Long, long[]> tuple3 = streamRecord.getValue();
             long user = tuple3.f0;
+            long[] userBehavior = tuple3.f2;
             long mainItem = tuple3.f1;
-            Map<Long, String> items = new HashMap<>();
-            for (long item : tuple3.f2) {
-                items.put(item, null);
+
+            if (!userAndPurchasedItems.containsKey(user)) {
+                Arrays.sort(userBehavior);
+                userAndPurchasedItems.put(user, userBehavior);
             }
 
-            userAndPurchasedItems.putIfAbsent(user, items);
-            itemAndPurchasers.putIfAbsent(mainItem, new HashMap<>());
-            itemAndPurchasers.get(mainItem).putIfAbsent(user, null);
+            itemAndPurchasers.putIfAbsent(mainItem, new ArrayList<>());
+            List<Long> purchasers = itemAndPurchasers.get(mainItem);
+            if (purchasers.size() == 0) {
+                purchasers.add(0L);
+            }
+            long total = purchasers.get(0);
+            if (purchasers.size() <= maxUserNumPerItem) {

Review Comment:
   nit: Let's add a comment here to explain the samping method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org